From 7e95814bb58d319d45640b1dc78d0e0900ad7c00 Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 2 Apr 2024 16:45:13 -0500 Subject: [PATCH] DBZ-7698 Switch to Decimal type, add examples --- .../connector/vitess/VitessSchemaFactory.java | 4 ++- .../VitessOrderedTransactionContext.java | 32 +++++++++++++++++-- .../VitessOrderedTransactionStructMaker.java | 17 +++++++++- .../txmetadata/VitessRankProvider.java | 8 ++--- .../connector/vitess/VitessConnectorIT.java | 15 ++++++--- .../vitess/VitessSchemaFactoryTest.java | 3 +- .../VitessOrderedTransactionContextTest.java | 26 ++++++++------- ...tessOrderedTransactionStructMakerTest.java | 3 +- .../txmetadata/VitessRankProviderTest.java | 6 ++-- 9 files changed, 85 insertions(+), 29 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java index cf88ba2e..5e6bb634 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.vitess; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -25,6 +26,7 @@ public static VitessSchemaFactory get() { } public Schema getOrderedTransactionBlockSchema() { + Schema rankSchema = Decimal.schema(0).schema(); return SchemaBuilder.struct().optional() .name(TRANSACTION_BLOCK_SCHEMA_NAME) .version(TRANSACTION_BLOCK_SCHEMA_VERSION) @@ -32,7 +34,7 @@ public Schema getOrderedTransactionBlockSchema() { .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA) .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA) .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA) - .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA) + .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, rankSchema) .build(); } } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java index f7a7abcc..7f9ad69f 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java @@ -5,7 +5,7 @@ */ package io.debezium.connector.vitess.pipeline.txmetadata; -import java.math.BigInteger; +import java.math.BigDecimal; import java.util.Map; import io.debezium.connector.vitess.Vgtid; @@ -17,7 +17,7 @@ public class VitessOrderedTransactionContext extends TransactionContext { public static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; protected String previousTransactionId = null; protected Long transactionEpoch = 0L; - protected BigInteger transactionRank = null; + protected BigDecimal transactionRank = null; private VitessEpochProvider epochProvider = new VitessEpochProvider(); private VitessRankProvider rankProvider = new VitessRankProvider(); @@ -32,6 +32,34 @@ public VitessOrderedTransactionContext(TransactionContext transactionContext) { this.totalEventCount = transactionContext.totalEventCount; } + /** + * Stores the needed information for determining Vitess rank & Epoch. Example (excluding standard fields added by super class): + * Input Offset map: + * { + * "transaction_id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]} \ + * {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}" + * } + * Current shard to epoch map, in epoch provider: + * { + * "-80": 0, + * "80-", 1 + * } + * Output offset map: + * { + * "transaction_id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]} \ + * {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}" + * "transaction_epoch": { + * "-80": 0, + * "80-", 1 + * } + * } + * + * Note: there is no need to store the transaction rank. We get the previous transaction ID from the "transaction_id" field + * and use that to compute the epoch. Rank requires no state (sum of max offsets of all hosts). + * + * @param offset + * @return + */ @Override public Map store(Map offset) { offset = super.store(offset); diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java index eea58fa1..d167c676 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java @@ -15,6 +15,21 @@ public class VitessOrderedTransactionStructMaker extends AbstractTransactionStructMaker implements TransactionStructMaker { + /** + * Adds the transaction block to a change log message. Transaction block example: + * "transaction": { + * "id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}, + * {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}", + * "total_order": 1, + * "data_collection_order": 1, + * "transaction_epoch": 0, + * "transaction_rank": 853 + * } + * @param offsetContext + * @param dataCollectionEventOrder + * @param value + * @return Struct with ordered transaction metadata + */ @Override public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) { Struct struct = super.prepareTxStruct(offsetContext, dataCollectionEventOrder, value); @@ -23,7 +38,7 @@ public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEv private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) { VitessOrderedTransactionContext context = getVitessTransactionOrderMetadata(offsetContext); - struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString()); + struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank); struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch); return struct; } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java index ebda013f..829630e7 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProvider.java @@ -5,15 +5,15 @@ */ package io.debezium.connector.vitess.pipeline.txmetadata; -import java.math.BigInteger; +import java.math.BigDecimal; public class VitessRankProvider { - public BigInteger getRank(String transactionId) { + public BigDecimal getRank(String transactionId) { Gtid gtid = new Gtid(transactionId); - BigInteger rank = new BigInteger("0"); + BigDecimal rank = new BigDecimal("0"); for (String sequenceValue : gtid.getSequenceValues()) { - rank = rank.add(new BigInteger(sequenceValue)); + rank = rank.add(new BigDecimal(sequenceValue)); } return rank; } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index abe3d596..6dad5073 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -13,6 +13,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.math.BigDecimal; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -48,6 +49,7 @@ import io.debezium.connector.vitess.connection.VitessReplicationConnection; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessRankProvider; import io.debezium.converters.CloudEventsConverterTest; import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.data.Envelope; @@ -424,10 +426,9 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception { startConnector(config -> config .with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class) .with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class) - .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) - .with(VitessConnectorConfig.SHARD, "-80,80-"), + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true), true, - "80-"); + "-80,80-"); assertConnectorIsRunning(); Vgtid baseVgtid = TestHelper.getCurrentVgtid(); @@ -465,14 +466,18 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception { SourceRecord beginRecord = assertRecordBeginSourceRecord(); assertThat(beginRecord.sourceOffset()).containsKey("transaction_epoch"); String expectedTxId1 = ((Struct) beginRecord.value()).getString("id"); + Long expectedEpoch = 0L; for (int i = 1; i <= expectedRecordsCount; i++) { SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD); + Struct source = (Struct) ((Struct) record.value()).get("source"); + String shard = source.getString("shard"); final Struct txn = ((Struct) record.value()).getStruct("transaction"); String txId = txn.getString("id"); assertThat(txId).isNotNull(); assertThat(txId).isEqualTo(expectedTxId1); - assertThat(txn.get("transaction_epoch")).isEqualTo(0L); - assertThat(txn.get("transaction_rank")).isNotNull(); + assertThat(txn.get("transaction_epoch")).isEqualTo(expectedEpoch); + BigDecimal expectedRank = new VitessRankProvider().getRank(Vgtid.of(expectedTxId1).getShardGtid(shard).getGtid()); + assertThat(txn.get("transaction_rank")).isEqualTo(expectedRank); Vgtid actualVgtid = Vgtid.of(txId); // The current vgtid is not the previous vgtid. assertThat(actualVgtid).isNotEqualTo(baseVgtid); diff --git a/src/test/java/io/debezium/connector/vitess/VitessSchemaFactoryTest.java b/src/test/java/io/debezium/connector/vitess/VitessSchemaFactoryTest.java index 416f08d7..c102fbb5 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessSchemaFactoryTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessSchemaFactoryTest.java @@ -9,6 +9,7 @@ import java.util.List; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.junit.Test; @@ -31,7 +32,7 @@ public void getOrderedTransactionBlockSchema() { assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, 1, Schema.INT64_SCHEMA)); assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, 2, Schema.INT64_SCHEMA)); assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, 3, Schema.INT64_SCHEMA)); - assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, 4, Schema.STRING_SCHEMA)); + assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, 4, Decimal.schema(0))); } @Test diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java index 9685fa69..f1c7ebe5 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java @@ -7,7 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.math.BigInteger; +import java.math.BigDecimal; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -17,6 +17,8 @@ import org.junit.Test; import io.debezium.connector.vitess.SourceInfo; +import io.debezium.connector.vitess.VgtidTest; +import io.debezium.pipeline.txmetadata.TransactionContext; public class VitessOrderedTransactionContextTest { @@ -29,13 +31,15 @@ public void shouldInit() { @Test public void shouldLoad() { - String expectedId = null; - String expectedEpoch = "{\"-80\": 0}"; + String expectedId = VgtidTest.VGTID_JSON; + String expectedEpoch = "{\"-80\": 5}"; Map offsets = Map.of( - VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch); - VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); - metadata.load(offsets); - assertThat(metadata.previousTransactionId).isEqualTo(expectedId); + VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch, + TransactionContext.OFFSET_TRANSACTION_ID, expectedId); + VitessOrderedTransactionContext context = VitessOrderedTransactionContext.load(offsets); + assertThat(context.previousTransactionId).isEqualTo(expectedId); + context.beginTransaction(new VitessTransactionInfo(VgtidTest.VGTID_JSON, "-80")); + assertThat(context.transactionEpoch).isEqualTo(5); } @Test @@ -54,7 +58,7 @@ public void shouldUpdateEpoch() { VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; - BigInteger expectedRank = new BigInteger("7"); + BigDecimal expectedRank = new BigDecimal("7"); long expectedEpoch = 0; String expectedShard = "-80"; @@ -64,7 +68,7 @@ public void shouldUpdateEpoch() { assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch); String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]"; - BigInteger expectedRank2 = new BigInteger("3"); + BigDecimal expectedRank2 = new BigDecimal("3"); long expectedEpoch2 = 1; VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard); @@ -82,12 +86,12 @@ public void shouldUpdateRank() { VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard); metadata.beginTransaction(transactionInfo); - assertThat(metadata.transactionRank).isEqualTo(7); + assertThat(metadata.transactionRank).isEqualTo(new BigDecimal(7)); String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]"; VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard); metadata.beginTransaction(transactionInfo2); - assertThat(metadata.transactionRank).isEqualTo(3); + assertThat(metadata.transactionRank).isEqualTo(new BigDecimal(3)); } @Test diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java index 89a32d90..3524b777 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMakerTest.java @@ -7,6 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.math.BigDecimal; import java.time.Instant; import org.apache.kafka.connect.data.Struct; @@ -32,7 +33,7 @@ public void prepareTxStruct() { OffsetContext context = new VitessOffsetContext(config, Vgtid.of(VgtidTest.VGTID_JSON), Instant.now(), transactionContext); Struct struct = maker.prepareTxStruct(context, 0, null); assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(0L); - assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK)).isEqualTo("1513"); + assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK)).isEqualTo(new BigDecimal(1513)); } @Test diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java index 35071965..2d1bbb77 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessRankProviderTest.java @@ -7,7 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.math.BigInteger; +import java.math.BigDecimal; import org.junit.Test; @@ -17,8 +17,8 @@ public class VitessRankProviderTest { public void shouldGetRankOneHost() { String txId = "host1:1-4"; VitessRankProvider provider = new VitessRankProvider(); - BigInteger rank = provider.getRank(txId); - assertThat(rank).isEqualTo(4); + BigDecimal rank = provider.getRank(txId); + assertThat(rank).isEqualTo(new BigDecimal(4)); } }