From 71e4c64182e9e450b3d2cf93638c44d56592cda8 Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 21 Mar 2024 20:03:16 -0500 Subject: [PATCH] DBZ-7698 Add transaction order metadata --- .../io/debezium/connector/vitess/Vgtid.java | 9 + .../vitess/VitessConnectorConfig.java | 15 +- .../vitess/VitessEventMetadataProvider.java | 8 +- .../connector/vitess/VitessOffsetContext.java | 22 ++- .../VStreamOutputMessageDecoder.java | 6 +- .../vitess/transaction/EpochProvider.java | 11 ++ .../connector/vitess/transaction/Gtid.java | 82 +++++++++ .../OrderedTransactionContext.java | 164 ++++++++++++++++++ .../vitess/transaction/RankProvider.java | 12 ++ .../transaction/VitessEpochProvider.java | 36 ++++ .../transaction/VitessRankProvider.java | 21 +++ .../connector/vitess/VitessConnectorIT.java | 141 ++++++++++++++- .../vitess/VitessOffsetContextTest.java | 27 +++ .../VStreamOutputMessageDecoderTest.java | 11 +- .../vitess/transaction/GtidTest.java | 26 +++ .../OrderedTransactionContextTest.java | 119 +++++++++++++ .../transaction/VitessEpochProviderTest.java | 61 +++++++ .../transaction/VitessRankProviderTest.java | 24 +++ 18 files changed, 778 insertions(+), 17 deletions(-) create mode 100644 src/main/java/io/debezium/connector/vitess/transaction/EpochProvider.java create mode 100644 src/main/java/io/debezium/connector/vitess/transaction/Gtid.java create mode 100644 src/main/java/io/debezium/connector/vitess/transaction/OrderedTransactionContext.java create mode 100644 src/main/java/io/debezium/connector/vitess/transaction/RankProvider.java create mode 100644 src/main/java/io/debezium/connector/vitess/transaction/VitessEpochProvider.java create mode 100644 src/main/java/io/debezium/connector/vitess/transaction/VitessRankProvider.java create mode 100644 src/test/java/io/debezium/connector/vitess/transaction/GtidTest.java create mode 100644 src/test/java/io/debezium/connector/vitess/transaction/OrderedTransactionContextTest.java create mode 100644 src/test/java/io/debezium/connector/vitess/transaction/VitessEpochProviderTest.java create mode 100644 src/test/java/io/debezium/connector/vitess/transaction/VitessRankProviderTest.java diff --git a/src/main/java/io/debezium/connector/vitess/Vgtid.java b/src/main/java/io/debezium/connector/vitess/Vgtid.java index cbbaf9c7..8cce24f8 100644 --- a/src/main/java/io/debezium/connector/vitess/Vgtid.java +++ b/src/main/java/io/debezium/connector/vitess/Vgtid.java @@ -87,6 +87,15 @@ public List getShardGtids() { return shardGtids; } + public ShardGtid getShardGtid(String shard) { + for (ShardGtid shardGtid : shardGtids) { + if (shardGtid.shard.equals(shard)) { + return shardGtid; + } + } + throw new RuntimeException("Gtid for shard missing, shard: " + shard + "vgtid: " + this.rawVgtid.toString()); + } + public boolean isSingleShard() { return rawVgtid.getShardGtidsCount() == 1; } diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 37a7ef2a..ef166e21 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -389,6 +389,15 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue + "'precise' represents values as precise (Java's 'BigDecimal') values;" + "'long' represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers."); + public static final Field PROVIDE_ORDERED_TRANSACTION_METADATA = Field.create("provide.ordered.transaction.metadata") + .withDisplayName("Provide ordered transaction meatadata") + .withType(Type.BOOLEAN) + .withDefault(false) + .withWidth(Width.SHORT) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription( + "Whether to provided ordered metadata on transactions"); + public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(VitessSourceInfoStructMaker.class.getName()); @@ -417,7 +426,7 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue .events( INCLUDE_UNKNOWN_DATATYPES, SOURCE_INFO_STRUCT_MAKER) - .connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE) + .connector(SNAPSHOT_MODE, BIGINT_UNSIGNED_HANDLING_MODE, PROVIDE_ORDERED_TRANSACTION_METADATA) .excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST) .create(); @@ -617,4 +626,8 @@ public BigIntUnsignedHandlingMode getBigIntUnsgnedHandlingMode() { return BigIntUnsignedHandlingMode.parse(getConfig().getString(BIGINT_UNSIGNED_HANDLING_MODE), BIGINT_UNSIGNED_HANDLING_MODE.defaultValueAsString()); } + + public boolean shouldProvideOrderedTransactionMetadata() { + return Boolean.parseBoolean(getConfig().getString(PROVIDE_ORDERED_TRANSACTION_METADATA)); + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java index 91757e40..a2c0cf74 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java +++ b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java @@ -57,8 +57,10 @@ public String getTransactionId( } final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE); - // Use the entire VGTID as transaction id - return sourceInfo.getString(SourceInfo.VGTID_KEY); - } + String shard = sourceInfo.getString(SourceInfo.SHARD_KEY); + String jsonString = sourceInfo.getString(SourceInfo.VGTID_KEY); + Vgtid vgtid = Vgtid.of(jsonString); + return vgtid.getShardGtid(shard).getGtid(); + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java b/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java index 0573ce46..062e1787 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java +++ b/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java @@ -16,6 +16,9 @@ import io.debezium.connector.SnapshotRecord; import io.debezium.connector.vitess.connection.VitessReplicationConnection; +import io.debezium.connector.vitess.transaction.OrderedTransactionContext; +import io.debezium.connector.vitess.transaction.VitessEpochProvider; +import io.debezium.connector.vitess.transaction.VitessRankProvider; import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -50,8 +53,16 @@ public static VitessOffsetContext initialContext( VitessConnectorConfig connectorConfig, Clock clock) { LOGGER.info("No previous offset exists. Use default VGTID."); final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig); + // use the other transaction context + TransactionContext transactionContext; + if (connectorConfig.shouldProvideOrderedTransactionMetadata()) { + transactionContext = new OrderedTransactionContext(new VitessEpochProvider(), new VitessRankProvider()); + } + else { + transactionContext = new TransactionContext(); + } return new VitessOffsetContext( - connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), new TransactionContext()); + connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext); } /** @@ -144,11 +155,18 @@ public Loader(VitessConnectorConfig connectorConfig) { @Override public VitessOffsetContext load(Map offset) { final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY); + TransactionContext transactionContext; + if (connectorConfig.shouldProvideOrderedTransactionMetadata()) { + transactionContext = OrderedTransactionContext.load(offset, new VitessEpochProvider(), new VitessRankProvider()); + } + else { + transactionContext = TransactionContext.load(offset); + } return new VitessOffsetContext( connectorConfig, Vgtid.of(vgtid), null, - TransactionContext.load(offset)); + transactionContext); } } } diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index c29aa6e3..bbf18253 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -90,7 +90,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Use the entire VGTID as transaction id if (newVgtid != null) { - this.transactionId = newVgtid.toString(); + this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid(); } processor.process( new DdlMessage(transactionId, eventTimestamp), newVgtid, false); @@ -101,7 +101,7 @@ private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor p Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Use the entire VGTID as transaction id if (newVgtid != null) { - this.transactionId = newVgtid.toString(); + this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid(); } processor.process( new OtherMessage(transactionId, eventTimestamp), newVgtid, false); @@ -112,7 +112,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Use the entire VGTID as transaction id. if (newVgtid != null) { - this.transactionId = newVgtid.toString(); + this.transactionId = newVgtid.getShardGtid(vEvent.getShard().toString()).getGtid(); } // Transaction ID must not be null in TransactionalMessage. if (this.transactionId == null) { diff --git a/src/main/java/io/debezium/connector/vitess/transaction/EpochProvider.java b/src/main/java/io/debezium/connector/vitess/transaction/EpochProvider.java new file mode 100644 index 00000000..43a0b64f --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transaction/EpochProvider.java @@ -0,0 +1,11 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +interface EpochProvider { + Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId); + +} diff --git a/src/main/java/io/debezium/connector/vitess/transaction/Gtid.java b/src/main/java/io/debezium/connector/vitess/transaction/Gtid.java new file mode 100644 index 00000000..6c89d405 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transaction/Gtid.java @@ -0,0 +1,82 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +class Gtid { + + public String getVersion() { + return version; + } + + private String version = ""; + + public Set getHosts() { + return hosts; + } + + private Set hosts = new HashSet(); + + public List getSequenceValues() { + return sequenceValues; + } + + private List sequenceValues = new ArrayList(); + + private static final String PREFIX_LAST_CHAR = "/"; + + private static int getVersionEndIndex(String transactionId) { + return transactionId.indexOf(PREFIX_LAST_CHAR); + } + + private static String trimVersion(String transactionId) { + int index = getVersionEndIndex(transactionId); + if (index != -1) { + return transactionId.substring(index + 1); + } + return transactionId; + } + + private void initializeVersion(String transactionId) { + int index = getVersionEndIndex(transactionId); + if (index != -1) { + this.version = transactionId.substring(0, index); + } + } + + Gtid(String transactionId) { + initializeVersion(transactionId); + parseGtid(transactionId); + } + + private void parseGtid(String transactionId) { + transactionId = trimVersion(transactionId); + String[] transactions = transactionId.split(","); + for (String transaction : transactions) { + String[] parts = transaction.split(":"); + String hostname = parts[0]; + hosts.add(hostname); + String maxSequenceValue = parts[1].split("-")[1]; + sequenceValues.add(maxSequenceValue); + } + } + + public boolean isHostSetEqual(Gtid hosts) { + return this.hosts.equals(hosts.hosts); + } + + public boolean isHostSetSupersetOf(Gtid previousHosts) { + return this.hosts.containsAll(previousHosts.hosts); + } + + public boolean isHostSetSubsetOf(Gtid previousHosts) { + return previousHosts.hosts.containsAll(this.hosts); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/transaction/OrderedTransactionContext.java b/src/main/java/io/debezium/connector/vitess/transaction/OrderedTransactionContext.java new file mode 100644 index 00000000..563c71bd --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transaction/OrderedTransactionContext.java @@ -0,0 +1,164 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; + +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.spi.schema.DataCollectionId; + +public class OrderedTransactionContext extends TransactionContext { + + protected static final String OFFSET_TRANSACTION_ID = "transaction_id"; + protected static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch"; + protected static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; + + private static final String OFFSET_TABLE_COUNT_PREFIX = "transaction_data_collection_order_"; + private static final int OFFSET_TABLE_COUNT_PREFIX_LENGTH = OFFSET_TABLE_COUNT_PREFIX.length(); + private String transactionId = null; + private final Map perTableEventCount = new HashMap(); + private final Map viewPerTableEventCount; + private long totalEventCount; + private String previousTransactionId = null; + private Long transactionEpoch; + private BigInteger transactionRank; + + private EpochProvider epochProvider; + private RankProvider rankProvider; + + public OrderedTransactionContext(EpochProvider epochProvider, RankProvider rankProvider) { + this.epochProvider = epochProvider; + this.rankProvider = rankProvider; + this.viewPerTableEventCount = Collections.unmodifiableMap(this.perTableEventCount); + this.totalEventCount = 0L; + this.transactionEpoch = 0L; + this.transactionRank = null; + } + + private void reset() { + this.transactionId = null; + this.totalEventCount = 0L; + this.perTableEventCount.clear(); + this.transactionRank = null; + } + + @Override + public Map store(Map offset) { + if (!Objects.isNull(this.transactionId)) { + offset.put(OFFSET_TRANSACTION_ID, this.transactionId); + } + if (!Objects.isNull(this.transactionEpoch)) { + offset.put(OFFSET_TRANSACTION_EPOCH, this.transactionEpoch); + } + if (!Objects.isNull(this.transactionRank)) { + offset.put(OFFSET_TRANSACTION_RANK, this.transactionRank.toString()); + } + + Iterator var3 = this.perTableEventCount.entrySet().iterator(); + + while (var3.hasNext()) { + Map.Entry e = (Map.Entry) var3.next(); + offset.put(OFFSET_TABLE_COUNT_PREFIX + e.getKey(), e.getValue()); + } + return offset; + } + + public static OrderedTransactionContext load(Map offsets, EpochProvider epochProvider, RankProvider rankProvider) { + OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); + context.transactionId = (String) offsets.get(OFFSET_TRANSACTION_ID); + context.previousTransactionId = (String) offsets.get(OFFSET_TRANSACTION_ID); + + context.transactionEpoch = (Long) offsets.get(OFFSET_TRANSACTION_EPOCH); + String transactionRankString = (String) offsets.get(OFFSET_TRANSACTION_RANK); + if (transactionRankString == null) { + context.transactionRank = null; + } + else { + context.transactionRank = new BigInteger(transactionRankString); + } + + Iterator var3 = offsets.entrySet().iterator(); + + while (var3.hasNext()) { + Map.Entry offset = (Map.Entry) var3.next(); + if ((offset.getKey()).startsWith(OFFSET_TABLE_COUNT_PREFIX)) { + String dataCollectionId = (offset.getKey()).substring(OFFSET_TABLE_COUNT_PREFIX_LENGTH); + Long count = (Long) offset.getValue(); + context.perTableEventCount.put(dataCollectionId, count); + } + } + + context.totalEventCount = context.perTableEventCount.values().stream().mapToLong((x) -> x).sum(); + return context; + } + + @Override + public boolean isTransactionInProgress() { + return !Objects.isNull(this.transactionId); + } + + @Override + public String getTransactionId() { + return transactionId; + } + + @Override + public long getTotalEventCount() { + return this.totalEventCount; + } + + @Override + public void beginTransaction(String txId) { + this.previousTransactionId = this.transactionId; + this.reset(); + this.transactionId = txId; + transactionEpoch = this.epochProvider.getEpoch(this.transactionEpoch, previousTransactionId, txId); + transactionRank = this.rankProvider.getRank(txId); + } + + @Override + public void endTransaction() { + this.reset(); + } + + @Override + public long event(DataCollectionId source) { + ++this.totalEventCount; + String sourceName = source.toString(); + long dataCollectionEventOrder = (Long) this.perTableEventCount.getOrDefault(sourceName, 0L) + 1L; + this.perTableEventCount.put(sourceName, dataCollectionEventOrder); + return dataCollectionEventOrder; + } + + @Override + public Map getPerTableEventCount() { + return this.viewPerTableEventCount; + } + + @Override + public String toString() { + return "TransactionContext [" + + "currentTransactionId=" + this.transactionId + + ", perTableEventCount=" + this.perTableEventCount + + ", totalEventCount=" + this.totalEventCount + + ", transactionEpoch=" + this.transactionEpoch + + ", transactionRank=" + this.transactionRank + + "]"; + } + + public Long getTransactionEpoch() { + return transactionEpoch; + } + + public BigInteger getTransactionRank() { + return transactionRank; + } +} diff --git a/src/main/java/io/debezium/connector/vitess/transaction/RankProvider.java b/src/main/java/io/debezium/connector/vitess/transaction/RankProvider.java new file mode 100644 index 00000000..d47e4af6 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transaction/RankProvider.java @@ -0,0 +1,12 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import java.math.BigInteger; + +interface RankProvider { + BigInteger getRank(String transactionId); +} diff --git a/src/main/java/io/debezium/connector/vitess/transaction/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/transaction/VitessEpochProvider.java new file mode 100644 index 00000000..85f80fbc --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transaction/VitessEpochProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VitessEpochProvider implements EpochProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(VitessEpochProvider.class); + + @Override + public Long getEpoch(Long previousEpoch, String previousTransactionId, String transactionId) { + if (previousTransactionId == null) { + return 0L; + } + + Gtid previousGtid = new Gtid(previousTransactionId); + Gtid gtid = new Gtid(transactionId); + if (previousGtid.isHostSetEqual(gtid) || gtid.isHostSetSupersetOf(previousGtid)) { + return previousEpoch; + } + else if (gtid.isHostSetSubsetOf(previousGtid)) { + return previousEpoch + 1; + } + else { + LOGGER.error( + "Error determining epoch, previous host set: {}, host set: {}", + previousGtid, gtid); + throw new RuntimeException("Can't determine epoch"); + } + } +} diff --git a/src/main/java/io/debezium/connector/vitess/transaction/VitessRankProvider.java b/src/main/java/io/debezium/connector/vitess/transaction/VitessRankProvider.java new file mode 100644 index 00000000..71f34db8 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transaction/VitessRankProvider.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import java.math.BigInteger; + +public class VitessRankProvider implements RankProvider { + + @Override + public BigInteger getRank(String transactionId) { + Gtid gtid = new Gtid(transactionId); + BigInteger rank = new BigInteger("0"); + for (String sequenceValue : gtid.getSequenceValues()) { + rank = rank.add(new BigInteger(sequenceValue)); + } + return rank; + } +} diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 4cb5ad44..678465de 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -415,6 +415,131 @@ public void shouldUsePrevVgtidAsOffsetWhenNoVgtidInGrpcResponse() throws Excepti Testing.Print.enable(); } + @Test + @FixFor("") + public void shouldTransactionMetadataUseLocalShard() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); + TestHelper.applyVSchema("vitess_vschema.json"); + startConnector(config -> config + .with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true) + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) + .with(VitessConnectorConfig.SHARD, "-80,80-"), + true, + "80-"); + assertConnectorIsRunning(); + + Vgtid baseVgtid = TestHelper.getCurrentVgtid(); + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount + 2); + + String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)"; + String insertQuery = "INSERT INTO numeric_table (" + + "tinyint_col," + + "tinyint_unsigned_col," + + "smallint_col," + + "smallint_unsigned_col," + + "mediumint_col," + + "mediumint_unsigned_col," + + "int_col," + + "int_unsigned_col," + + "bigint_col," + + "bigint_unsigned_col," + + "bigint_unsigned_overflow_col," + + "float_col," + + "double_col," + + "decimal_col," + + "boolean_col)" + + " VALUES " + rowValue; + StringBuilder insertRows = new StringBuilder().append(insertQuery); + for (int i = 1; i < expectedRecordsCount; i++) { + insertRows.append(", ").append(rowValue); + } + + String insertRowsStatement = insertRows.toString(); + try { + // exercise SUT + executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE); + // First transaction. + SourceRecord beginRecord = assertRecordBeginSourceRecord(); + assertThat(beginRecord.sourceOffset()).containsKey("transaction_epoch"); + String expectedTxId1 = ((Struct) beginRecord.value()).getString("id"); + for (int i = 1; i <= expectedRecordsCount; i++) { + SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD); + final Struct txn = ((Struct) record.value()).getStruct("transaction"); + String txId = txn.getString("id"); + assertThat(txId).isNotNull(); + assertThat(txId).isEqualTo(expectedTxId1); + Vgtid actualVgtid = Vgtid.of(txId); + // The current vgtid is not the previous vgtid. + assertThat(actualVgtid).isNotEqualTo(baseVgtid); + } + assertRecordEnd(expectedTxId1, expectedRecordsCount); + } + catch (Exception e) { + } + } + + @Test + @FixFor("") + public void shouldTransactionMetadataUseLocalShard2() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); + TestHelper.applyVSchema("vitess_vschema.json"); + startConnector(config -> config + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) + .with(VitessConnectorConfig.SHARD, "-80,80-"), + true, + "80-"); + assertConnectorIsRunning(); + + Vgtid baseVgtid = TestHelper.getCurrentVgtid(); + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount + 2); + + String rowValue = "(1, 1, 12, 12, 123, 123, 1234, 1234, 12345, 12345, 18446744073709551615, 1.5, 2.5, 12.34, true)"; + String insertQuery = "INSERT INTO numeric_table (" + + "tinyint_col," + + "tinyint_unsigned_col," + + "smallint_col," + + "smallint_unsigned_col," + + "mediumint_col," + + "mediumint_unsigned_col," + + "int_col," + + "int_unsigned_col," + + "bigint_col," + + "bigint_unsigned_col," + + "bigint_unsigned_overflow_col," + + "float_col," + + "double_col," + + "decimal_col," + + "boolean_col)" + + " VALUES " + rowValue; + StringBuilder insertRows = new StringBuilder().append(insertQuery); + for (int i = 1; i < expectedRecordsCount; i++) { + insertRows.append(", ").append(rowValue); + } + + String insertRowsStatement = insertRows.toString(); + try { + // exercise SUT + executeAndWait(insertRowsStatement, TEST_SHARDED_KEYSPACE); + // First transaction. + String expectedTxId1 = assertRecordBegin(); + for (int i = 1; i <= expectedRecordsCount; i++) { + SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD); + final Struct txn = ((Struct) record.value()).getStruct("transaction"); + String txId = txn.getString("id"); + assertThat(txId).isNotNull(); + assertThat(txId).isEqualTo(expectedTxId1); + Vgtid actualVgtid = Vgtid.of(txId); + // The current vgtid is not the previous vgtid. + assertThat(actualVgtid).isNotEqualTo(baseVgtid); + } + assertRecordEnd(expectedTxId1, expectedRecordsCount); + } + catch (Exception e) { + } + } + @Test @FixFor("DBZ-5063") public void shouldUseSameTransactionIdWhenMultiGrpcResponses() throws Exception { @@ -472,12 +597,12 @@ public void shouldUseSameTransactionIdWhenMultiGrpcResponses() throws Exception for (int i = 1; i <= expectedRecordsCount1; i++) { SourceRecord record = assertRecordInserted(TEST_UNSHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD); final Struct txn = ((Struct) record.value()).getStruct("transaction"); + final Struct source = ((Struct) record.value()).getStruct("source"); String txId = txn.getString("id"); assertThat(txId).isNotNull(); assertThat(txId).isEqualTo(expectedTxId1); - Vgtid actualVgtid = Vgtid.of(txId); // The current vgtid is not the previous vgtid. - assertThat(actualVgtid).isNotEqualTo(baseVgtid); + assertThat(txId).isNotEqualTo(baseVgtid.getShardGtid(source.getString("shard")).getGtid()); } assertRecordEnd(expectedTxId1, expectedRecordsCount1); @@ -489,9 +614,8 @@ public void shouldUseSameTransactionIdWhenMultiGrpcResponses() throws Exception String txId = txn.getString("id"); assertThat(txId).isNotNull(); assertThat(txId).isEqualTo(expectedTxId2); - Vgtid actualVgtid = Vgtid.of(txId); // The current vgtid is not the previous vgtid. - assertThat(actualVgtid).isNotEqualTo(Vgtid.of(expectedTxId1)); + assertThat(txId).isNotEqualTo(expectedTxId1); } assertRecordEnd(expectedTxId2, expectedRecordsCount2); } @@ -1272,14 +1396,19 @@ private SourceRecord assertRecordUpdated(SourceRecord updatedRecord) { return updatedRecord; } + private SourceRecord assertRecordBeginSourceRecord() { + assertFalse("records not generated", consumer.isEmpty()); + SourceRecord record = consumer.remove(); + return record; + } + /** * Assert that the connector receives a valid BEGIN event. * * @return The transaction id */ private String assertRecordBegin() { - assertFalse("records not generated", consumer.isEmpty()); - SourceRecord record = consumer.remove(); + SourceRecord record = assertRecordBeginSourceRecord(); final Struct end = (Struct) record.value(); assertThat(end.getString("status")).isEqualTo("BEGIN"); return end.getString("id"); diff --git a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java index d831159e..c8281b3e 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java @@ -13,6 +13,9 @@ import org.junit.Before; import org.junit.Test; +import io.debezium.connector.vitess.transaction.OrderedTransactionContext; +import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.util.Clock; import io.debezium.util.Collect; public class VitessOffsetContextTest { @@ -95,4 +98,28 @@ public void shouldResetToNewVGgtid() { new Vgtid.ShardGtid(TEST_KEYSPACE, TEST_SHARD, "new_gtid"), new Vgtid.ShardGtid(TEST_KEYSPACE, TEST_SHARD2, "new_gtid2")))); } + + @Test + public void shouldGetOrderedTransactionContext() { + VitessConnectorConfig config = new VitessConnectorConfig( + TestHelper.defaultConfig() + .with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true) + .build()); + VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config); + Map offsets = Map.of(SourceInfo.VGTID_KEY, VGTID_JSON); + VitessOffsetContext context = loader.load(offsets); + TransactionContext transactionContext = context.getTransactionContext(); + assertThat(transactionContext).isInstanceOf(OrderedTransactionContext.class); + } + + @Test + public void shouldGetInitialOrderedTransactionContext() { + VitessConnectorConfig config = new VitessConnectorConfig( + TestHelper.defaultConfig() + .with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true) + .build()); + VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system()); + TransactionContext transactionContext = context.getTransactionContext(); + assertThat(transactionContext).isInstanceOf(OrderedTransactionContext.class); + } } diff --git a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java index da905db5..902b8dd8 100644 --- a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java +++ b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java @@ -54,8 +54,10 @@ public void shouldProcessBeginEvent() throws Exception { Binlogdata.VEvent event = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.BEGIN) .setTimestamp(AnonymousValue.getLong()) + .setShard(VgtidTest.TEST_SHARD) .build(); Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON); + String expectedGtid = newVgtid.getShardGtid(VgtidTest.TEST_SHARD).getGtid(); // exercise SUT final boolean[] processed = { false }; @@ -66,8 +68,8 @@ public void shouldProcessBeginEvent() throws Exception { assertThat(message).isNotNull(); assertThat(message).isInstanceOf(TransactionalMessage.class); assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.BEGIN); - assertThat(message.getTransactionId()).isEqualTo(newVgtid.toString()); - assertThat(vgtid).isEqualTo(newVgtid); + assertThat(message.getTransactionId()).isEqualTo(expectedGtid); + assertThat(vgtid).isEqualTo(vgtid); processed[0] = true; }, newVgtid, @@ -483,11 +485,13 @@ public void shouldSetRowEventsToCommitTimestamp() throws Exception { Long expectedCommitTimestamp = 2L; Binlogdata.VEvent beginEvent = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.BEGIN) + .setShard(VgtidTest.TEST_SHARD) .setTimestamp(expectedBeginTimestamp) .build(); Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.COMMIT) .setTimestamp(expectedCommitTimestamp) + .setShard(VgtidTest.TEST_SHARD) .build(); decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp())); decoder.processMessage(TestHelper.defaultFieldEvent(), null, null, false); @@ -544,14 +548,17 @@ public void shouldSetOtherEventsToEventTimestamp() throws Exception { Long expectedCommitTimestamp = 2L; Binlogdata.VEvent otherEvent = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.OTHER) + .setShard(VgtidTest.TEST_SHARD) .setTimestamp(expectedEventTimestamp) .build(); Binlogdata.VEvent ddlEvent = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.DDL) + .setShard(VgtidTest.TEST_SHARD) .setTimestamp(expectedEventTimestamp) .build(); Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder() .setType(Binlogdata.VEventType.COMMIT) + .setShard(VgtidTest.TEST_SHARD) .setTimestamp(expectedCommitTimestamp) .build(); decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp())); diff --git a/src/test/java/io/debezium/connector/vitess/transaction/GtidTest.java b/src/test/java/io/debezium/connector/vitess/transaction/GtidTest.java new file mode 100644 index 00000000..700a61b4 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/transaction/GtidTest.java @@ -0,0 +1,26 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Set; + +import org.junit.Test; + +public class GtidTest { + + @Test + public void shouldInit() { + String expectedVersion = "MySQL56"; + Gtid gtid = new Gtid(expectedVersion + "/host1:1-4,host2:2-10"); + assertThat(gtid.getVersion()).isEqualTo(expectedVersion); + assertThat(gtid.getSequenceValues()).isEqualTo(List.of("4", "10")); + assertThat(gtid.getHosts()).isEqualTo(Set.of("host1", "host2")); + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/transaction/OrderedTransactionContextTest.java b/src/test/java/io/debezium/connector/vitess/transaction/OrderedTransactionContextTest.java new file mode 100644 index 00000000..e13ad31f --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/transaction/OrderedTransactionContextTest.java @@ -0,0 +1,119 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class OrderedTransactionContextTest { + + @Test + public void shouldInit() { + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); + context.getTransactionId(); + } + + @Test + public void shouldLoad() { + String expectedId = "foo"; + Long expectedEpoch = 1L; + String expectedRank = "10"; + Map offsets = Map.of( + OrderedTransactionContext.OFFSET_TRANSACTION_ID, expectedId, + OrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch, + OrderedTransactionContext.OFFSET_TRANSACTION_RANK, expectedRank); + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = OrderedTransactionContext.load(offsets, epochProvider, rankProvider); + assertThat(context.getTransactionId()).isEqualTo(expectedId); + assertThat(context.getTransactionEpoch()).isEqualTo(expectedEpoch); + assertThat(context.getTransactionRank()).isEqualTo(new BigInteger(expectedRank)); + } + + @Test + public void shouldLoadWithNull() { + String expectedId = null; + Long expectedEpoch = null; + BigInteger expectedRank = null; + Map offsets = Collections.emptyMap(); + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = OrderedTransactionContext.load(offsets, epochProvider, rankProvider); + assertThat(context.getTransactionId()).isEqualTo(expectedId); + assertThat(context.getTransactionEpoch()).isEqualTo(expectedEpoch); + assertThat(context.getTransactionRank()).isEqualTo(expectedRank); + } + + @Test + public void shouldUpdateEpoch() { + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); + + String expectedTxId = ("host1:1-3,host2:3-4"); + context.beginTransaction(expectedTxId); + assertThat(context.getTransactionId()).isEqualTo(expectedTxId); + assertThat(context.getTransactionEpoch()).isEqualTo(0); + + String expectedTxId2 = "host1:1-3"; + context.beginTransaction(expectedTxId2); + assertThat(context.getTransactionId()).isEqualTo(expectedTxId2); + assertThat(context.getTransactionEpoch()).isEqualTo(1); + } + + @Test + public void shouldUpdateRank() { + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); + + String expectedTxId = ("host1:1-3,host2:3-4"); + context.beginTransaction(expectedTxId); + assertThat(context.getTransactionRank()).isEqualTo(7); + + String expectedTxId2 = "host1:1-3"; + context.beginTransaction(expectedTxId2); + assertThat(context.getTransactionRank()).isEqualTo(3); + } + + @Test + public void shouldStoreOffsets() { + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); + + String expectedTxId = ("host1:1-3,host2:3-4"); + context.beginTransaction(expectedTxId); + + Map offsets = new HashMap(); + Map actualOffsets = context.store(offsets); + assertThat(actualOffsets.get(OrderedTransactionContext.OFFSET_TRANSACTION_ID)).isEqualTo(expectedTxId); + assertThat(actualOffsets.get(OrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(0L); + assertThat(actualOffsets.get(OrderedTransactionContext.OFFSET_TRANSACTION_RANK)).isEqualTo("7"); + } + + @Test + public void shouldKnowTransactionInProgress() { + EpochProvider epochProvider = new VitessEpochProvider(); + RankProvider rankProvider = new VitessRankProvider(); + OrderedTransactionContext context = new OrderedTransactionContext(epochProvider, rankProvider); + + String expectedTxId = ("host1:1-3,host2:3-4"); + context.beginTransaction(expectedTxId); + assertThat(context.isTransactionInProgress()).isTrue(); + context.endTransaction(); + assertThat(context.isTransactionInProgress()).isFalse(); + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/transaction/VitessEpochProviderTest.java b/src/test/java/io/debezium/connector/vitess/transaction/VitessEpochProviderTest.java new file mode 100644 index 00000000..3bfb09d9 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/transaction/VitessEpochProviderTest.java @@ -0,0 +1,61 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class VitessEpochProviderTest { + + private String prefix = "MySQL56/"; + private String host1Tx1 = "027c67a2-c0b0-11ec-8a34-0ed0087913a5:1-11418261"; + private String host1Tx2 = "027c67a2-c0b0-11ec-8a34-0ed0087913a5:1-11418262"; + private String host2Tx1 = "08fb1cf3-0ce5-11ed-b921-0a8939501751:1-1443715"; + + private String previousTxId = prefix + String.join(",", host1Tx1, host2Tx1); + private String txId = prefix + String.join(",", host1Tx2, host2Tx1); + private String txIdShrunk = prefix + String.join(",", host1Tx2); + + private String txIdUpgrade = "MySQL82/" + String.join(",", host1Tx2); + + @Test + public void testGetEpochSameHostSet() { + EpochProvider provider = new VitessEpochProvider(); + Long epoch = provider.getEpoch(0L, previousTxId, txId); + assertThat(epoch).isEqualTo(0); + } + + @Test + public void testGetEpochShrunkHostSet() { + EpochProvider provider = new VitessEpochProvider(); + Long epoch = provider.getEpoch(0L, previousTxId, txIdShrunk); + assertThat(epoch).isEqualTo(1); + } + + @Test + public void testGetEpochExpandHostSet() { + EpochProvider provider = new VitessEpochProvider(); + Long epoch = provider.getEpoch(0L, previousTxId, txId); + assertThat(epoch).isEqualTo(0); + } + + @Test + public void testGetEpochDisjointThrowsException() { + EpochProvider provider = new VitessEpochProvider(); + Assertions.assertThatThrownBy(() -> { + provider.getEpoch(0L, previousTxId, "foo:1-2,bar:2-4"); + }).isInstanceOf(RuntimeException.class); + } + + @Test + public void testGetEpochVersionUpgrade() { + EpochProvider provider = new VitessEpochProvider(); + Long epoch = provider.getEpoch(0L, previousTxId, txIdUpgrade); + assertThat(epoch).isEqualTo(1); + } +} diff --git a/src/test/java/io/debezium/connector/vitess/transaction/VitessRankProviderTest.java b/src/test/java/io/debezium/connector/vitess/transaction/VitessRankProviderTest.java new file mode 100644 index 00000000..441dd1f6 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/transaction/VitessRankProviderTest.java @@ -0,0 +1,24 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.vitess.transaction; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigInteger; + +import org.junit.Test; + +public class VitessRankProviderTest { + + @Test + public void shouldGetRankOneHost() { + String txId = "host1:1-4"; + RankProvider provider = new VitessRankProvider(); + BigInteger rank = provider.getRank(txId); + assertThat(rank).isEqualTo(4); + } + +}