diff --git a/src/main/java/io/debezium/connector/vitess/Vgtid.java b/src/main/java/io/debezium/connector/vitess/Vgtid.java index cbbaf9c7..c9f8bab1 100644 --- a/src/main/java/io/debezium/connector/vitess/Vgtid.java +++ b/src/main/java/io/debezium/connector/vitess/Vgtid.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.DebeziumException; + import binlogdata.Binlogdata; /** Vitess source position coordinates. */ @@ -87,6 +89,15 @@ public List getShardGtids() { return shardGtids; } + public ShardGtid getShardGtid(String shard) { + for (ShardGtid shardGtid : shardGtids) { + if (shardGtid.shard.equals(shard)) { + return shardGtid; + } + } + throw new DebeziumException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString()); + } + public boolean isSingleShard() { return rawVgtid.getShardGtidsCount() == 1; } diff --git a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java index e747aeff..36c26130 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java +++ b/src/main/java/io/debezium/connector/vitess/VitessDatabaseSchema.java @@ -45,6 +45,7 @@ public VitessDatabaseSchema( schemaNameAdjuster, config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), + config.getTransactionMetadataFactory().getTransactionStructMaker().getTransactionBlockSchema(), config.getFieldNamer(), false), false, diff --git a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java index 91757e40..ee7b96b4 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java +++ b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java @@ -10,9 +10,11 @@ import org.apache.kafka.connect.data.Struct; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo; import io.debezium.data.Envelope; import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.TransactionInfo; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Collect; @@ -61,4 +63,15 @@ public String getTransactionId( return sourceInfo.getString(SourceInfo.VGTID_KEY); } + @Override + public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) { + if (value == null || source == null) { + return null; + } + final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE); + String vgtid = sourceInfo.getString(SourceInfo.VGTID_KEY); + String shard = sourceInfo.getString(SourceInfo.SHARD_KEY); + return new VitessTransactionInfo(vgtid, shard); + } + } diff --git a/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java b/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java index 0573ce46..0ebce59d 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java +++ b/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java @@ -50,8 +50,11 @@ public static VitessOffsetContext initialContext( VitessConnectorConfig connectorConfig, Clock clock) { LOGGER.info("No previous offset exists. Use default VGTID."); final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig); - return new VitessOffsetContext( - connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), new TransactionContext()); + // use the other transaction context + TransactionContext transactionContext = connectorConfig.getTransactionMetadataFactory().getTransactionContext(); + VitessOffsetContext context = new VitessOffsetContext( + connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext); + return context; } /** @@ -143,12 +146,15 @@ public Loader(VitessConnectorConfig connectorConfig) { @Override public VitessOffsetContext load(Map offset) { + LOGGER.info("Previous offset exists, load from {}", offset); final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY); + TransactionContext transactionContext = connectorConfig.getTransactionMetadataFactory() + .getTransactionContext().newTransactionContextFromOffsets(offset); return new VitessOffsetContext( connectorConfig, Vgtid.of(vgtid), null, - TransactionContext.load(offset)); + transactionContext); } } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java index 283d14ad..5e6bb634 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java @@ -5,6 +5,12 @@ */ package io.debezium.connector.vitess; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; +import io.debezium.pipeline.txmetadata.TransactionStructMaker; import io.debezium.schema.SchemaFactory; public class VitessSchemaFactory extends SchemaFactory { @@ -18,4 +24,17 @@ public VitessSchemaFactory() { public static VitessSchemaFactory get() { return vitessSchemaFactoryObject; } + + public Schema getOrderedTransactionBlockSchema() { + Schema rankSchema = Decimal.schema(0).schema(); + return SchemaBuilder.struct().optional() + .name(TRANSACTION_BLOCK_SCHEMA_NAME) + .version(TRANSACTION_BLOCK_SCHEMA_VERSION) + .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA) + .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA) + .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA) + .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA) + .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, rankSchema) + .build(); + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index 87d7b655..24a6cbac 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -14,6 +14,7 @@ import io.debezium.connector.vitess.connection.ReplicationConnection; import io.debezium.connector.vitess.connection.ReplicationMessage; import io.debezium.connector.vitess.connection.ReplicationMessageProcessor; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; @@ -96,7 +97,8 @@ private ReplicationMessageProcessor newReplicationMessageProcessor(VitessPartiti offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); if (message.getOperation() == ReplicationMessage.Operation.BEGIN) { // send to transaction topic - dispatcher.dispatchTransactionStartedEvent(partition, message.getTransactionId(), offsetContext, message.getCommitTime()); + VitessTransactionInfo transactionInfo = new VitessTransactionInfo(message.getTransactionId(), message.getShard()); + dispatcher.dispatchTransactionStartedEvent(partition, transactionInfo, offsetContext, message.getCommitTime()); } else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) { // send to transaction topic diff --git a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java index 0b368858..76c38796 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java @@ -14,11 +14,13 @@ public class TransactionalMessage implements ReplicationMessage { private final String transactionId; private final Instant commitTime; private final Operation operation; + private final String shard; - public TransactionalMessage(Operation operation, String transactionId, Instant commitTime) { + public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String shard) { this.transactionId = transactionId; this.commitTime = commitTime; this.operation = operation; + this.shard = shard; } @Override @@ -43,7 +45,7 @@ public String getTable() { @Override public String getShard() { - throw new UnsupportedOperationException(); + return shard; } @Override diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index 36461353..9e2a4a0e 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -121,7 +121,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc } LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp); processor.process( - new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp), newVgtid, false); + new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false); } private void handleCommitMessage( @@ -135,7 +135,7 @@ private void handleCommitMessage( } LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp); processor.process( - new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp), newVgtid, false); + new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false); } private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java new file mode 100644 index 00000000..bf8b10f1 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java @@ -0,0 +1,82 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +class Gtid { + + public String getVersion() { + return version; + } + + private String version = ""; + + public Set getHosts() { + return hosts; + } + + private Set hosts = new HashSet(); + + public List getSequenceValues() { + return sequenceValues; + } + + private List sequenceValues = new ArrayList(); + + private static final String PREFIX_LAST_CHAR = "/"; + + private static int getVersionEndIndex(String transactionId) { + return transactionId.indexOf(PREFIX_LAST_CHAR); + } + + private static String trimVersion(String transactionId) { + int index = getVersionEndIndex(transactionId); + if (index != -1) { + return transactionId.substring(index + 1); + } + return transactionId; + } + + private void initializeVersion(String transactionId) { + int index = getVersionEndIndex(transactionId); + if (index != -1) { + this.version = transactionId.substring(0, index); + } + } + + Gtid(String transactionId) { + initializeVersion(transactionId); + parseGtid(transactionId); + } + + private void parseGtid(String transactionId) { + transactionId = trimVersion(transactionId); + String[] transactions = transactionId.split(","); + for (String transaction : transactions) { + String[] parts = transaction.split(":"); + String hostname = parts[0]; + hosts.add(hostname); + String maxSequenceValue = parts[1].split("-")[1]; + sequenceValues.add(maxSequenceValue); + } + } + + public boolean isHostSetEqual(Gtid hosts) { + return this.hosts.equals(hosts.hosts); + } + + public boolean isHostSetSupersetOf(Gtid previousHosts) { + return this.hosts.containsAll(previousHosts.hosts); + } + + public boolean isHostSetSubsetOf(Gtid previousHosts) { + return previousHosts.hosts.containsAll(this.hosts); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java new file mode 100644 index 00000000..c703c943 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -0,0 +1,96 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.DebeziumException; +import io.debezium.connector.vitess.Vgtid; + +public class VitessEpochProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class); + private Map shardToEpoch = new HashMap<>(); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static boolean isInvalidGtid(String gtid) { + return gtid.equals(Vgtid.CURRENT_GTID) || gtid.equals(Vgtid.EMPTY_GTID); + } + + public static Long getEpochForGtid(Long previousEpoch, String previousGtidString, String gtidString) { + if (isInvalidGtid(previousGtidString)) { + return previousEpoch + 1; + } + if (isInvalidGtid(gtidString)) { + throw new DebeziumException("Invalid GTID: The current GTID cannot be one of current or empty " + gtidString); + } + Gtid previousGtid = new Gtid(previousGtidString); + Gtid gtid = new Gtid(gtidString); + if (previousGtid.isHostSetEqual(gtid) || gtid.isHostSetSupersetOf(previousGtid)) { + return previousEpoch; + } + else if (gtid.isHostSetSubsetOf(previousGtid)) { + return previousEpoch + 1; + } + else { + LOGGER.error( + "Error determining epoch, previous host set: {}, host set: {}", + previousGtid, gtid); + throw new RuntimeException("Can't determine epoch"); + } + } + + public Map store(Map offset) { + try { + offset.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch)); + return offset; + } + catch (JsonProcessingException e) { + throw new RuntimeException("Cannot store epoch: " + shardToEpoch.toString()); + } + } + + public void load(Map offsets) { + try { + String shardToEpochString = (String) offsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH); + if (shardToEpochString != null) { + shardToEpoch = MAPPER.readValue(shardToEpochString, new TypeReference>() { + }); + } + } + catch (JsonProcessingException e) { + throw new RuntimeException("Cannot read epoch: " + shardToEpoch.toString()); + } + } + + public Long getEpoch(String shard, String previousVgtidString, String vgtidString) { + if (previousVgtidString == null) { + long epoch = 0L; + storeEpoch(shard, epoch); + return epoch; + } + Vgtid vgtid = Vgtid.of(vgtidString); + Vgtid previousVgtid = Vgtid.of(previousVgtidString); + String previousGtid = previousVgtid.getShardGtid(shard).getGtid(); + String gtid = vgtid.getShardGtid(shard).getGtid(); + long previousEpoch = shardToEpoch.getOrDefault(shard, 0L); + long currentEpoch = getEpochForGtid(previousEpoch, previousGtid, gtid); + storeEpoch(shard, currentEpoch); + return currentEpoch; + } + + private void storeEpoch(String shard, long epoch) { + shardToEpoch.put(shard, epoch); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java new file mode 100644 index 00000000..27fd528c --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java @@ -0,0 +1,120 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import java.math.BigDecimal; +import java.util.Map; + +import io.debezium.connector.vitess.Vgtid; +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.pipeline.txmetadata.TransactionInfo; + +public class VitessOrderedTransactionContext extends TransactionContext { + public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch"; + public static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; + protected String previousVgtid = null; + protected Long transactionEpoch = 0L; + protected BigDecimal transactionRank = null; + private VitessEpochProvider epochProvider = new VitessEpochProvider(); + + public VitessOrderedTransactionContext() { + } + + public VitessOrderedTransactionContext(TransactionContext transactionContext) { + super(); + // Copy fields + super.setTransactionId(transactionContext.getTransactionId()); + super.putPerTableEventCount(transactionContext.getPerTableEventCount()); + super.setTotalEventCount(transactionContext.getTotalEventCount()); + } + + /** + * Stores the needed information for determining Vitess rank & Epoch. Example (excluding standard fields added by super class): + * Input Offset map: + * { + * "transaction_id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]} \ + * {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}" + * } + * Current shard to epoch map, in epoch provider: + * { + * "-80": 0, + * "80-", 1 + * } + * Output offset map: + * { + * "transaction_id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]} \ + * {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}" + * "transaction_epoch": { + * "-80": 0, + * "80-", 1 + * } + * } + * + * Note: there is no need to store the transaction rank. We get the previous transaction ID from the "transaction_id" field + * and use that to compute the epoch. Rank requires no state (sum of max offsets of all hosts). + * + * @param offset + * @return + */ + @Override + public Map store(Map offset) { + offset = super.store(offset); + return epochProvider.store(offset); + } + + @Override + public TransactionContext newTransactionContextFromOffsets(Map offsets) { + return VitessOrderedTransactionContext.load(offsets); + } + + public static VitessOrderedTransactionContext load(Map offsets) { + TransactionContext transactionContext = TransactionContext.load(offsets); + VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext(transactionContext); + vitessOrderedTransactionContext.previousVgtid = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID); + vitessOrderedTransactionContext.epochProvider.load(offsets); + return vitessOrderedTransactionContext; + } + + @Override + public void beginTransaction(TransactionInfo transactionInfo) { + super.beginTransaction(transactionInfo); + VitessTransactionInfo vitessTransactionInfo = (VitessTransactionInfo) transactionInfo; + beginTransaction(vitessTransactionInfo.getShard(), vitessTransactionInfo.getTransactionId()); + } + + @Override + public void endTransaction() { + super.endTransaction(); + this.transactionEpoch = null; + this.transactionRank = null; + } + + private void beginTransaction(String shard, String vgtid) { + this.transactionEpoch = this.epochProvider.getEpoch(shard, this.previousVgtid, vgtid); + this.transactionRank = VitessRankProvider.getRank(Vgtid.of(vgtid).getShardGtid(shard).getGtid()); + this.previousVgtid = vgtid; + } + + @Override + public String toString() { + return "VitessOrderedTransactionContext [currentTransactionId=" + getTransactionId() + ", perTableEventCount=" + + getPerTableEventCount() + ", totalEventCount=" + getTotalEventCount() + "]" + ", previousVgtid=" + previousVgtid + + ", transactionEpoch=" + transactionEpoch + ", transactionRank=" + transactionRank; + } + + public String getPreviousVgtid() { + return previousVgtid; + } + + public Long getTransactionEpoch() { + return transactionEpoch; + } + + public BigDecimal getTransactionRank() { + return transactionRank; + } + +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionMetadataFactory.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionMetadataFactory.java new file mode 100644 index 00000000..391a787e --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionMetadataFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.pipeline.txmetadata; + +import io.debezium.config.Configuration; +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.pipeline.txmetadata.TransactionStructMaker; +import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory; + +public class VitessOrderedTransactionMetadataFactory implements TransactionMetadataFactory { + + private final Configuration configuraiton; + + public VitessOrderedTransactionMetadataFactory(Configuration configuration) { + this.configuraiton = configuration; + } + + @Override + public TransactionContext getTransactionContext() { + return new VitessOrderedTransactionContext(); + } + + @Override + public TransactionStructMaker getTransactionStructMaker() { + return new VitessOrderedTransactionStructMaker(configuraiton); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java new file mode 100644 index 00000000..65b15d21 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import io.debezium.config.Configuration; +import io.debezium.connector.vitess.VitessSchemaFactory; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.AbstractTransactionStructMaker; +import io.debezium.pipeline.txmetadata.TransactionStructMaker; + +public class VitessOrderedTransactionStructMaker extends AbstractTransactionStructMaker implements TransactionStructMaker { + + public VitessOrderedTransactionStructMaker(Configuration config) { + super(config); + } + + /** + * Adds the transaction block to a change log message. Transaction block example: + * "transaction": { + * "id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}, + * {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}", + * "total_order": 1, + * "data_collection_order": 1, + * "transaction_epoch": 0, + * "transaction_rank": 853 + * } + * @param offsetContext + * @param dataCollectionEventOrder + * @param value + * @return Struct with ordered transaction metadata + */ + @Override + public Struct addTransactionBlock(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) { + Struct struct = super.addTransactionBlock(offsetContext, dataCollectionEventOrder, value); + return addOrderMetadata(struct, offsetContext); + } + + private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) { + VitessOrderedTransactionContext context = getVitessTransactionOrderMetadata(offsetContext); + struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank); + struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch); + return struct; + } + + private VitessOrderedTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) { + return (VitessOrderedTransactionContext) offsetContext.getTransactionContext(); + } + + @Override + public Schema getTransactionBlockSchema() { + return VitessSchemaFactory.get().getOrderedTransactionBlockSchema(); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java new file mode 100644 index 00000000..a06497fd --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import java.math.BigDecimal; + +public class VitessRankProvider { + + public static BigDecimal getRank(String transactionId) { + Gtid gtid = new Gtid(transactionId); + BigDecimal rank = new BigDecimal("0"); + for (String sequenceValue : gtid.getSequenceValues()) { + rank = rank.add(new BigDecimal(sequenceValue)); + } + return rank; + } +} diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionInfo.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionInfo.java new file mode 100644 index 00000000..aa02a910 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionInfo.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import io.debezium.pipeline.txmetadata.TransactionInfo; + +public class VitessTransactionInfo implements TransactionInfo { + + private final String vgtid; + private final String shard; + + public VitessTransactionInfo(String vgtid, String shard) { + this.vgtid = vgtid; + this.shard = shard; + } + + @Override + public String getTransactionId() { + return this.vgtid; + } + + public String getShard() { + return this.shard; + } +} diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index 6d229d25..c1c1c65a 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -59,7 +59,7 @@ public class TestHelper { "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\"}" + "]"; - protected static final String VGTID_JSON_TEMPLATE = "[" + + public static final String VGTID_JSON_TEMPLATE = "[" + "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}," + "{\"keyspace\":\"%s\",\"shard\":\"%s\",\"gtid\":\"%s\",\"table_p_ks\":[]}" + "]"; diff --git a/src/test/java/io/debezium/connector/vitess/VgtidTest.java b/src/test/java/io/debezium/connector/vitess/VgtidTest.java index 702784e9..f8b9ac7d 100644 --- a/src/test/java/io/debezium/connector/vitess/VgtidTest.java +++ b/src/test/java/io/debezium/connector/vitess/VgtidTest.java @@ -14,12 +14,14 @@ import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_NO_PKS_TEMPLATE; import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import org.junit.Test; import org.skyscreamer.jsonassert.JSONAssert; +import io.debezium.DebeziumException; import io.debezium.util.Collect; import binlogdata.Binlogdata; @@ -303,4 +305,19 @@ public void shouldEqualsIfEqualityHolds() { assertThat(vgtids.stream().allMatch(vgtid::equals)).isTrue(); } } + + @Test + public void shouldGetShardGtid() { + Vgtid vgtid1 = Vgtid.of(VGTID_JSON); + Vgtid.ShardGtid shardGtid = vgtid1.getShardGtid(TEST_SHARD); + assertThat(shardGtid).isEqualTo(new Vgtid.ShardGtid(TEST_KEYSPACE, TEST_SHARD, TEST_GTID)); + } + + @Test + public void shouldGetMissingShardGtidThrowsDebeziumException() { + Vgtid vgtid1 = Vgtid.of(VGTID_JSON); + assertThatThrownBy(() -> { + Vgtid.ShardGtid shardGtid = vgtid1.getShardGtid("missing_shard"); + }).isInstanceOf(DebeziumException.class); + } } diff --git a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java index aa3f30cf..668d2e6a 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java @@ -125,7 +125,8 @@ public void shouldGetOldAndNewColumnValuesFromUpdate() { @Test(expected = UnsupportedOperationException.class) public void shouldNotSupportBeginMessage() { // setup fixture - ReplicationMessage message = new TransactionalMessage(ReplicationMessage.Operation.BEGIN, AnonymousValue.getString(), AnonymousValue.getInstant()); + ReplicationMessage message = new TransactionalMessage(ReplicationMessage.Operation.BEGIN, AnonymousValue.getString(), AnonymousValue.getInstant(), + AnonymousValue.getString()); // exercise SUT new VitessChangeRecordEmitter( @@ -140,7 +141,8 @@ public void shouldNotSupportBeginMessage() { @Test(expected = UnsupportedOperationException.class) public void shouldNotSupportCommitMessage() { // setup fixture - ReplicationMessage message = new TransactionalMessage(ReplicationMessage.Operation.COMMIT, AnonymousValue.getString(), AnonymousValue.getInstant()); + ReplicationMessage message = new TransactionalMessage(ReplicationMessage.Operation.COMMIT, AnonymousValue.getString(), AnonymousValue.getInstant(), + AnonymousValue.getString()); // exercise SUT new VitessChangeRecordEmitter( diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 02890042..c4b83b05 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -8,11 +8,13 @@ import static io.debezium.connector.vitess.TestHelper.TEST_SERVER; import static io.debezium.connector.vitess.TestHelper.TEST_SHARDED_KEYSPACE; import static io.debezium.connector.vitess.TestHelper.TEST_UNSHARDED_KEYSPACE; +import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE; import static junit.framework.TestCase.assertEquals; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.math.BigDecimal; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -42,10 +44,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.connector.vitess.connection.VitessReplicationConnection; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessRankProvider; import io.debezium.converters.CloudEventsConverterTest; import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.data.Envelope; @@ -469,6 +476,159 @@ public void shouldUsePrevVgtidAsOffsetWhenNoVgtidInGrpcResponse() throws Excepti Testing.Print.enable(); } + @Test + public void shouldProvideOrderedTransactionMetadata() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); + TestHelper.applyVSchema("vitess_vschema.json"); + startConnector(config -> config + .with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true), + true, + "-80,80-"); + assertConnectorIsRunning(); + + Vgtid baseVgtid = TestHelper.getCurrentVgtid(); + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount + 2); + + String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)"; + String insertQuery = "INSERT INTO numeric_table (" + + "tinyint_col," + + "tinyint_unsigned_col," + + "smallint_col," + + "smallint_unsigned_col," + + "mediumint_col," + + "mediumint_unsigned_col," + + "int_col," + + "int_unsigned_col," + + "bigint_col," + + "bigint_unsigned_col," + + "bigint_unsigned_overflow_col," + + "float_col," + + "double_col," + + "decimal_col," + + "boolean_col)" + + " VALUES " + rowValue; + StringBuilder insertRows = new StringBuilder().append(insertQuery); + for (int i = 1; i < expectedRecordsCount; i++) { + insertRows.append(", ").append(rowValue); + } + + String insertRowsStatement = insertRows.toString(); + + // exercise SUT + executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE); + // First transaction. + SourceRecord beginRecord = assertRecordBeginSourceRecord(); + assertThat(beginRecord.sourceOffset()).containsKey("transaction_epoch"); + String expectedTxId1 = ((Struct) beginRecord.value()).getString("id"); + Long expectedEpoch = 0L; + for (int i = 1; i <= expectedRecordsCount; i++) { + SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD); + Struct source = (Struct) ((Struct) record.value()).get("source"); + String shard = source.getString("shard"); + final Struct txn = ((Struct) record.value()).getStruct("transaction"); + String txId = txn.getString("id"); + assertThat(txId).isNotNull(); + assertThat(txId).isEqualTo(expectedTxId1); + assertThat(txn.get("transaction_epoch")).isEqualTo(expectedEpoch); + BigDecimal expectedRank = VitessRankProvider.getRank(Vgtid.of(expectedTxId1).getShardGtid(shard).getGtid()); + assertThat(txn.get("transaction_rank")).isEqualTo(expectedRank); + Vgtid actualVgtid = Vgtid.of(txId); + // The current vgtid is not the previous vgtid. + assertThat(actualVgtid).isNotEqualTo(baseVgtid); + } + assertRecordEnd(expectedTxId1, expectedRecordsCount); + } + + @Test + public void shouldIncrementEpochWhenFastForwardVgtidWithOrderedTransactionMetadata() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); + TestHelper.applyVSchema("vitess_vschema.json"); + + ObjectMapper mapper = new ObjectMapper(); + Map srcPartition = Collect.hashMapOf(VitessPartition.SERVER_PARTITION_KEY, TEST_SERVER); + String currentVgtid = String.format( + VGTID_JSON_TEMPLATE, + TEST_SHARDED_KEYSPACE, + VgtidTest.TEST_SHARD, + Vgtid.CURRENT_GTID, + TEST_SHARDED_KEYSPACE, + VgtidTest.TEST_SHARD2, + Vgtid.CURRENT_GTID); + Map shardToEpoch = Map.of(VgtidTest.TEST_SHARD, 2L, VgtidTest.TEST_SHARD2, 3L); + Map offsetId = Map.of( + VitessOrderedTransactionContext.OFFSET_TRANSACTION_ID, currentVgtid, + VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, mapper.writeValueAsString(shardToEpoch), + SourceInfo.VGTID_KEY, currentVgtid); + Map, Map> offsets = Map.of(srcPartition, offsetId); + Configuration config = TestHelper.defaultConfig() + .with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .with(CommonConnectorConfig.TOPIC_PREFIX, TEST_SERVER) + .with(VitessConnectorConfig.KEYSPACE, TEST_SHARDED_KEYSPACE) + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) + .with(VitessConnectorConfig.SHARD, "-80,80-") + .build(); + + storeOffsets(config, offsets); + + startConnector(config); + assertConnectorIsRunning(); + + Vgtid baseVgtid = TestHelper.getCurrentVgtid(); + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount + 2); + + String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)"; + String insertQuery = "INSERT INTO numeric_table (" + + "tinyint_col," + + "tinyint_unsigned_col," + + "smallint_col," + + "smallint_unsigned_col," + + "mediumint_col," + + "mediumint_unsigned_col," + + "int_col," + + "int_unsigned_col," + + "bigint_col," + + "bigint_unsigned_col," + + "bigint_unsigned_overflow_col," + + "float_col," + + "double_col," + + "decimal_col," + + "boolean_col)" + + " VALUES " + rowValue; + StringBuilder insertRows = new StringBuilder().append(insertQuery); + for (int i = 1; i < expectedRecordsCount; i++) { + insertRows.append(", ").append(rowValue); + } + + String insertRowsStatement = insertRows.toString(); + + // exercise SUT + executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE); + // First transaction. + SourceRecord beginRecord = assertRecordBeginSourceRecord(); + assertThat(beginRecord.sourceOffset()).containsKey("transaction_epoch"); + String expectedTxId1 = ((Struct) beginRecord.value()).getString("id"); + for (int i = 1; i <= expectedRecordsCount; i++) { + SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD); + Struct source = (Struct) ((Struct) record.value()).get("source"); + String shard = source.getString("shard"); + Long expectedEpoch = shardToEpoch.get(shard) + 1; + final Struct txn = ((Struct) record.value()).getStruct("transaction"); + String txId = txn.getString("id"); + assertThat(txId).isNotNull(); + assertThat(txId).isEqualTo(expectedTxId1); + assertThat(txn.get("transaction_epoch")).isEqualTo(expectedEpoch); + BigDecimal expectedRank = VitessRankProvider.getRank(Vgtid.of(expectedTxId1).getShardGtid(shard).getGtid()); + assertThat(txn.get("transaction_rank")).isEqualTo(expectedRank); + Vgtid actualVgtid = Vgtid.of(txId); + // The current vgtid is not the previous vgtid. + assertThat(actualVgtid).isNotEqualTo(baseVgtid); + } + assertRecordEnd(expectedTxId1, expectedRecordsCount); + } + @Test @FixFor("DBZ-5063") public void shouldUseSameTransactionIdWhenMultiGrpcResponses() throws Exception { @@ -1223,10 +1383,16 @@ private void startConnector(Function fields = orderedTransactionBlockSchema.fields(); + assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, 0, Schema.STRING_SCHEMA)); + assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, 1, Schema.INT64_SCHEMA)); + assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, 2, Schema.INT64_SCHEMA)); + assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, 3, Schema.INT64_SCHEMA)); + assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, 4, Decimal.schema(0))); + } + + @Test + public void datatypeEnvelopeSchema() { + } +} diff --git a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java index da905db5..fee609c5 100644 --- a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java +++ b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java @@ -51,8 +51,10 @@ public void before() { @Test public void shouldProcessBeginEvent() throws Exception { // setup fixture + String expectedShard = "shard"; Binlogdata.VEvent event = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.BEGIN) + .setShard(expectedShard) .setTimestamp(AnonymousValue.getLong()) .build(); Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON); @@ -66,6 +68,7 @@ public void shouldProcessBeginEvent() throws Exception { assertThat(message).isNotNull(); assertThat(message).isInstanceOf(TransactionalMessage.class); assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.BEGIN); + assertThat(message.getShard()).isEqualTo(expectedShard); assertThat(message.getTransactionId()).isEqualTo(newVgtid.toString()); assertThat(vgtid).isEqualTo(newVgtid); processed[0] = true; @@ -102,10 +105,12 @@ public void shouldNotProcessBeginEventIfNoVgtid() throws Exception { @Test public void shouldProcessCommitEvent() throws Exception { + String expectedShard = "shard"; // setup fixture Binlogdata.VEvent event = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.COMMIT) .setTimestamp(AnonymousValue.getLong()) + .setShard(expectedShard) .build(); Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON); decoder.setTransactionId(newVgtid.toString()); @@ -119,6 +124,7 @@ public void shouldProcessCommitEvent() throws Exception { assertThat(message).isNotNull(); assertThat(message).isInstanceOf(TransactionalMessage.class); assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.COMMIT); + assertThat(message.getShard()).isEqualTo(expectedShard); assertThat(message.getTransactionId()).isEqualTo(newVgtid.toString()); assertThat(vgtid).isEqualTo(newVgtid); processed[0] = true; diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java new file mode 100644 index 00000000..082b96ac --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/GtidTest.java @@ -0,0 +1,26 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; + +import org.junit.Test; + +public class GtidTest { + + @Test + public void shouldInit() { + String expectedVersion = "MySQL56"; + Gtid gtid = new Gtid(expectedVersion + "/host1:1-4,host2:2-10"); + assertThat(gtid.getVersion()).isEqualTo(expectedVersion); + assertThat(gtid.getSequenceValues()).isEqualTo(List.of("4", "10")); + assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2")); + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java new file mode 100644 index 00000000..5b72ed0a --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java @@ -0,0 +1,142 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static io.debezium.connector.vitess.TestHelper.VGTID_JSON_TEMPLATE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Java6Assertions.assertThatThrownBy; + +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import io.debezium.DebeziumException; +import io.debezium.connector.vitess.Vgtid; +import io.debezium.connector.vitess.VgtidTest; + +public class VitessEpochProviderTest { + + private String prefix = "MySQL56/"; + private String host1Tx1 = "027c67a2-c0b0-11ec-8a34-0ed0087913a5:1-11418261"; + private String host1Tx2 = "027c67a2-c0b0-11ec-8a34-0ed0087913a5:1-11418262"; + private String host2Tx1 = "08fb1cf3-0ce5-11ed-b921-0a8939501751:1-1443715"; + + private String previousTxId = prefix + String.join(",", host1Tx1, host2Tx1); + private String txId = prefix + String.join(",", host1Tx2, host2Tx1); + private String txIdShrunk = prefix + String.join(",", host1Tx2); + + private String txIdVersion5 = "MySQL57/" + String.join(",", host1Tx2); + private String txIdVersion8 = "MySQL82/" + String.join(",", host1Tx2); + + @Test + public void testGetEpochSameHostSet() { + Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId); + assertThat(epoch).isEqualTo(0); + } + + @Test + public void testGetEpochVgtid() { + VitessEpochProvider provider = new VitessEpochProvider(); + String expectedEpoch = "{\"-80\": 5}"; + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch)); + Long epoch = provider.getEpoch("-80", VgtidTest.VGTID_JSON, VgtidTest.VGTID_JSON); + assertThat(epoch).isEqualTo(5); + } + + @Test + public void snapshotIncrementsEpoch() { + VitessEpochProvider provider = new VitessEpochProvider(); + String vgtidJsonEmpty = String.format( + VGTID_JSON_TEMPLATE, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD, + Vgtid.EMPTY_GTID, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD2, + Vgtid.EMPTY_GTID); + Long epoch = provider.getEpoch(VgtidTest.TEST_SHARD, vgtidJsonEmpty, VgtidTest.VGTID_JSON); + assertThat(epoch).isEqualTo(1L); + } + + @Test + public void fastForwardVgtidIncrementsEpoch() { + VitessEpochProvider provider = new VitessEpochProvider(); + String vgtidJsonCurrent = String.format( + VGTID_JSON_TEMPLATE, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD, + Vgtid.EMPTY_GTID, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD2, + Vgtid.EMPTY_GTID); + Long epoch = provider.getEpoch(VgtidTest.TEST_SHARD, vgtidJsonCurrent, VgtidTest.VGTID_JSON); + assertThat(epoch).isEqualTo(1L); + } + + @Test + public void testInvalidCurrentGtid() { + Long expectedEpoch = 0L; + VitessEpochProvider provider = new VitessEpochProvider(); + Long epoch = provider.getEpoch("-80", VgtidTest.VGTID_JSON, VgtidTest.VGTID_JSON); + assertThat(epoch).isEqualTo(expectedEpoch); + String vgtidJsonCurrent = String.format( + VGTID_JSON_TEMPLATE, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD, + Vgtid.EMPTY_GTID, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD2, + Vgtid.EMPTY_GTID); + assertThatThrownBy(() -> { + provider.getEpoch("-80", VgtidTest.VGTID_JSON, vgtidJsonCurrent); + }).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid"); + } + + @Test + public void testInvalidEmptyGtid() { + Long expectedEpoch = 0L; + VitessEpochProvider provider = new VitessEpochProvider(); + Long epoch = provider.getEpoch("-80", VgtidTest.VGTID_JSON, VgtidTest.VGTID_JSON); + assertThat(epoch).isEqualTo(expectedEpoch); + String vgtidJsonEmpty = String.format( + VGTID_JSON_TEMPLATE, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD, + Vgtid.EMPTY_GTID, + VgtidTest.TEST_KEYSPACE, + VgtidTest.TEST_SHARD2, + Vgtid.EMPTY_GTID); + assertThatThrownBy(() -> { + provider.getEpoch("-80", VgtidTest.VGTID_JSON, vgtidJsonEmpty); + }).isInstanceOf(DebeziumException.class).hasMessageContaining("Invalid"); + } + + @Test + public void testGetEpochShrunkHostSet() { + Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txIdShrunk); + assertThat(epoch).isEqualTo(1); + } + + @Test + public void testGetEpochExpandHostSet() { + Long epoch = VitessEpochProvider.getEpochForGtid(0L, previousTxId, txId); + assertThat(epoch).isEqualTo(0); + } + + @Test + public void testGetEpochDisjointThrowsException() { + Assertions.assertThatThrownBy(() -> { + VitessEpochProvider.getEpochForGtid(0L, previousTxId, "foo:1-2,bar:2-4"); + }).isInstanceOf(RuntimeException.class); + } + + @Test + public void testVersionUpgradeDoesNotAffectEpoch() { + Long epoch = VitessEpochProvider.getEpochForGtid(0L, txIdVersion5, txIdVersion8); + assertThat(epoch).isEqualTo(0L); + } +} diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java new file mode 100644 index 00000000..d4d5aed3 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java @@ -0,0 +1,112 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.Test; + +import io.debezium.connector.vitess.SourceInfo; +import io.debezium.connector.vitess.VgtidTest; +import io.debezium.pipeline.txmetadata.TransactionContext; + +public class VitessOrderedTransactionContextTest { + + private static final Schema sourceStructSchema = SchemaBuilder.struct().field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA); + + @Test + public void shouldInit() { + new VitessOrderedTransactionContext(); + } + + @Test + public void shouldLoad() { + String expectedId = VgtidTest.VGTID_JSON; + String expectedEpoch = "{\"-80\": 5}"; + Map offsets = Map.of( + VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch, + TransactionContext.OFFSET_TRANSACTION_ID, expectedId); + VitessOrderedTransactionContext context = VitessOrderedTransactionContext.load(offsets); + assertThat(context.previousVgtid).isEqualTo(expectedId); + context.beginTransaction(new VitessTransactionInfo(VgtidTest.VGTID_JSON, "-80")); + assertThat(context.transactionEpoch).isEqualTo(5); + } + + @Test + public void shouldLoadWithNull() { + String expectedId = null; + Long expectedEpoch = 0L; + Map offsets = Collections.emptyMap(); + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); + metadata.load(offsets); + assertThat(metadata.previousVgtid).isEqualTo(expectedId); + assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch); + } + + @Test + public void shouldUpdateEpoch() { + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); + + String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; + BigDecimal expectedRank = new BigDecimal("7"); + long expectedEpoch = 0; + String expectedShard = "-80"; + + VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard); + metadata.beginTransaction(transactionInfo); + assertThat(metadata.transactionRank).isEqualTo(expectedRank); + assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch); + + String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]"; + BigDecimal expectedRank2 = new BigDecimal("3"); + long expectedEpoch2 = 1; + + VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard); + metadata.beginTransaction(transactionInfo2); + assertThat(metadata.transactionRank).isEqualTo(expectedRank2); + assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch2); + } + + @Test + public void shouldUpdateRank() { + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); + + String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; + String expectedShard = "-80"; + + VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard); + metadata.beginTransaction(transactionInfo); + assertThat(metadata.transactionRank).isEqualTo(new BigDecimal(7)); + + String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]"; + VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard); + metadata.beginTransaction(transactionInfo2); + assertThat(metadata.transactionRank).isEqualTo(new BigDecimal(3)); + } + + @Test + public void shouldStoreOffsets() { + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); + + String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; + String expectedShard = "-80"; + + VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard); + metadata.beginTransaction(transactionInfo); + + Map offsets = new HashMap(); + String expectedEpoch = "{\"-80\":0}"; + Map actualOffsets = metadata.store(offsets); + assertThat(actualOffsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch); + } +} diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java new file mode 100644 index 00000000..c5e7dc1b --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java @@ -0,0 +1,45 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import java.time.Instant; + +import org.apache.kafka.connect.data.Struct; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.vitess.TestHelper; +import io.debezium.connector.vitess.Vgtid; +import io.debezium.connector.vitess.VgtidTest; +import io.debezium.connector.vitess.VitessConnectorConfig; +import io.debezium.connector.vitess.VitessOffsetContext; +import io.debezium.connector.vitess.VitessSchemaFactory; +import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.TransactionContext; + +public class VitessOrderedTransactionStructMakerTest { + + @Test + public void prepareTxStruct() { + VitessConnectorConfig config = new VitessConnectorConfig(TestHelper.defaultConfig().build()); + VitessOrderedTransactionStructMaker maker = new VitessOrderedTransactionStructMaker(Configuration.empty()); + TransactionContext transactionContext = new VitessOrderedTransactionContext(); + transactionContext.beginTransaction(new VitessTransactionInfo(VgtidTest.VGTID_JSON, VgtidTest.TEST_SHARD)); + OffsetContext context = new VitessOffsetContext(config, Vgtid.of(VgtidTest.VGTID_JSON), Instant.now(), transactionContext); + Struct struct = maker.addTransactionBlock(context, 0, null); + assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(0L); + assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK)).isEqualTo(new BigDecimal(1513)); + } + + @Test + public void getTransactionBlockSchema() { + VitessOrderedTransactionStructMaker maker = new VitessOrderedTransactionStructMaker(Configuration.empty()); + assertThat(maker.getTransactionBlockSchema()).isEqualTo(VitessSchemaFactory.get().getOrderedTransactionBlockSchema()); + } +} diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java new file mode 100644 index 00000000..4ff17202 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; + +import org.junit.Test; + +public class VitessRankProviderTest { + + @Test + public void shouldGetRankOneHost() { + String txId = "host1:1-4"; + BigDecimal rank = VitessRankProvider.getRank(txId); + assertThat(rank).isEqualTo(new BigDecimal(4)); + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionInfoTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionInfoTest.java new file mode 100644 index 00000000..2574cca5 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionInfoTest.java @@ -0,0 +1,25 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.pipeline.txmetadata; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; + +public class VitessTransactionInfoTest { + + @Test + public void getTransactionId() { + String expectedId = "vgtid"; + assertThat(new VitessTransactionInfo(expectedId, "shard").getTransactionId()).isEqualTo(expectedId); + } + + @Test + public void getShard() { + String expectedShard = "shard"; + assertThat(new VitessTransactionInfo("vgtid", expectedShard).getShard()).isEqualTo(expectedShard); + } +}