Skip to content

Commit

Permalink
DBZ-7698 Switch to Decimal type, add examples
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Apr 2, 2024
1 parent 289e0c5 commit 7e95814
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.vitess;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

Expand All @@ -25,14 +26,15 @@ public static VitessSchemaFactory get() {
}

public Schema getOrderedTransactionBlockSchema() {
Schema rankSchema = Decimal.schema(0).schema();
return SchemaBuilder.struct().optional()
.name(TRANSACTION_BLOCK_SCHEMA_NAME)
.version(TRANSACTION_BLOCK_SCHEMA_VERSION)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, rankSchema)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package io.debezium.connector.vitess.pipeline.txmetadata;

import java.math.BigInteger;
import java.math.BigDecimal;
import java.util.Map;

import io.debezium.connector.vitess.Vgtid;
Expand All @@ -17,7 +17,7 @@ public class VitessOrderedTransactionContext extends TransactionContext {
public static final String OFFSET_TRANSACTION_RANK = "transaction_rank";
protected String previousTransactionId = null;
protected Long transactionEpoch = 0L;
protected BigInteger transactionRank = null;
protected BigDecimal transactionRank = null;
private VitessEpochProvider epochProvider = new VitessEpochProvider();
private VitessRankProvider rankProvider = new VitessRankProvider();

Expand All @@ -32,6 +32,34 @@ public VitessOrderedTransactionContext(TransactionContext transactionContext) {
this.totalEventCount = transactionContext.totalEventCount;
}

/**
* Stores the needed information for determining Vitess rank & Epoch. Example (excluding standard fields added by super class):
* Input Offset map:
* {
* "transaction_id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]} \
* {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}"
* }
* Current shard to epoch map, in epoch provider:
* {
* "-80": 0,
* "80-", 1
* }
* Output offset map:
* {
* "transaction_id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]} \
* {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}"
* "transaction_epoch": {
* "-80": 0,
* "80-", 1
* }
* }
*
* Note: there is no need to store the transaction rank. We get the previous transaction ID from the "transaction_id" field
* and use that to compute the epoch. Rank requires no state (sum of max offsets of all hosts).
*
* @param offset
* @return
*/
@Override
public Map<String, Object> store(Map<String, Object> offset) {
offset = super.store(offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@

public class VitessOrderedTransactionStructMaker extends AbstractTransactionStructMaker implements TransactionStructMaker {

/**
* Adds the transaction block to a change log message. Transaction block example:
* "transaction": {
* "id": "[{\"keyspace\":\ks1\",\"shard\":\"-80\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]},
* {\"keyspace\":\ks1\",\"shard\":\"80-\",\"gtid\":\"MySQL56/host1:123,host2:234\",\"table_p_ks\":[]}",
* "total_order": 1,
* "data_collection_order": 1,
* "transaction_epoch": 0,
* "transaction_rank": 853
* }
* @param offsetContext
* @param dataCollectionEventOrder
* @param value
* @return Struct with ordered transaction metadata
*/
@Override
public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) {
Struct struct = super.prepareTxStruct(offsetContext, dataCollectionEventOrder, value);
Expand All @@ -23,7 +38,7 @@ public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEv

private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) {
VitessOrderedTransactionContext context = getVitessTransactionOrderMetadata(offsetContext);
struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString());
struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank);
struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch);
return struct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
*/
package io.debezium.connector.vitess.pipeline.txmetadata;

import java.math.BigInteger;
import java.math.BigDecimal;

public class VitessRankProvider {

public BigInteger getRank(String transactionId) {
public BigDecimal getRank(String transactionId) {
Gtid gtid = new Gtid(transactionId);
BigInteger rank = new BigInteger("0");
BigDecimal rank = new BigDecimal("0");
for (String sequenceValue : gtid.getSequenceValues()) {
rank = rank.add(new BigInteger(sequenceValue));
rank = rank.add(new BigDecimal(sequenceValue));
}
return rank;
}
Expand Down
15 changes: 10 additions & 5 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -48,6 +49,7 @@
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessRankProvider;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -424,10 +426,9 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception {
startConnector(config -> config
.with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(VitessConnectorConfig.SHARD, "-80,80-"),
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true),
true,
"80-");
"-80,80-");
assertConnectorIsRunning();

Vgtid baseVgtid = TestHelper.getCurrentVgtid();
Expand Down Expand Up @@ -465,14 +466,18 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception {
SourceRecord beginRecord = assertRecordBeginSourceRecord();
assertThat(beginRecord.sourceOffset()).containsKey("transaction_epoch");
String expectedTxId1 = ((Struct) beginRecord.value()).getString("id");
Long expectedEpoch = 0L;
for (int i = 1; i <= expectedRecordsCount; i++) {
SourceRecord record = assertRecordInserted(TEST_SHARDED_KEYSPACE + ".numeric_table", TestHelper.PK_FIELD);
Struct source = (Struct) ((Struct) record.value()).get("source");
String shard = source.getString("shard");
final Struct txn = ((Struct) record.value()).getStruct("transaction");
String txId = txn.getString("id");
assertThat(txId).isNotNull();
assertThat(txId).isEqualTo(expectedTxId1);
assertThat(txn.get("transaction_epoch")).isEqualTo(0L);
assertThat(txn.get("transaction_rank")).isNotNull();
assertThat(txn.get("transaction_epoch")).isEqualTo(expectedEpoch);
BigDecimal expectedRank = new VitessRankProvider().getRank(Vgtid.of(expectedTxId1).getShardGtid(shard).getGtid());
assertThat(txn.get("transaction_rank")).isEqualTo(expectedRank);
Vgtid actualVgtid = Vgtid.of(txId);
// The current vgtid is not the previous vgtid.
assertThat(actualVgtid).isNotEqualTo(baseVgtid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.util.List;

import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.junit.Test;
Expand All @@ -31,7 +32,7 @@ public void getOrderedTransactionBlockSchema() {
assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, 1, Schema.INT64_SCHEMA));
assertThat(fields).contains(new Field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, 2, Schema.INT64_SCHEMA));
assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, 3, Schema.INT64_SCHEMA));
assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, 4, Schema.STRING_SCHEMA));
assertThat(fields).contains(new Field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, 4, Decimal.schema(0)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.math.BigInteger;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -17,6 +17,8 @@
import org.junit.Test;

import io.debezium.connector.vitess.SourceInfo;
import io.debezium.connector.vitess.VgtidTest;
import io.debezium.pipeline.txmetadata.TransactionContext;

public class VitessOrderedTransactionContextTest {

Expand All @@ -29,13 +31,15 @@ public void shouldInit() {

@Test
public void shouldLoad() {
String expectedId = null;
String expectedEpoch = "{\"-80\": 0}";
String expectedId = VgtidTest.VGTID_JSON;
String expectedEpoch = "{\"-80\": 5}";
Map offsets = Map.of(
VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch);
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();
metadata.load(offsets);
assertThat(metadata.previousTransactionId).isEqualTo(expectedId);
VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch,
TransactionContext.OFFSET_TRANSACTION_ID, expectedId);
VitessOrderedTransactionContext context = VitessOrderedTransactionContext.load(offsets);
assertThat(context.previousTransactionId).isEqualTo(expectedId);
context.beginTransaction(new VitessTransactionInfo(VgtidTest.VGTID_JSON, "-80"));
assertThat(context.transactionEpoch).isEqualTo(5);
}

@Test
Expand All @@ -54,7 +58,7 @@ public void shouldUpdateEpoch() {
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
BigInteger expectedRank = new BigInteger("7");
BigDecimal expectedRank = new BigDecimal("7");
long expectedEpoch = 0;
String expectedShard = "-80";

Expand All @@ -64,7 +68,7 @@ public void shouldUpdateEpoch() {
assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch);

String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]";
BigInteger expectedRank2 = new BigInteger("3");
BigDecimal expectedRank2 = new BigDecimal("3");
long expectedEpoch2 = 1;

VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard);
Expand All @@ -82,12 +86,12 @@ public void shouldUpdateRank() {

VitessTransactionInfo transactionInfo = new VitessTransactionInfo(expectedTxId, expectedShard);
metadata.beginTransaction(transactionInfo);
assertThat(metadata.transactionRank).isEqualTo(7);
assertThat(metadata.transactionRank).isEqualTo(new BigDecimal(7));

String expectedTxId2 = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3\", \"shard\": \"-80\"}]";
VitessTransactionInfo transactionInfo2 = new VitessTransactionInfo(expectedTxId2, expectedShard);
metadata.beginTransaction(transactionInfo2);
assertThat(metadata.transactionRank).isEqualTo(3);
assertThat(metadata.transactionRank).isEqualTo(new BigDecimal(3));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.math.BigDecimal;
import java.time.Instant;

import org.apache.kafka.connect.data.Struct;
Expand All @@ -32,7 +33,7 @@ public void prepareTxStruct() {
OffsetContext context = new VitessOffsetContext(config, Vgtid.of(VgtidTest.VGTID_JSON), Instant.now(), transactionContext);
Struct struct = maker.prepareTxStruct(context, 0, null);
assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(0L);
assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK)).isEqualTo("1513");
assertThat(struct.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK)).isEqualTo(new BigDecimal(1513));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.math.BigInteger;
import java.math.BigDecimal;

import org.junit.Test;

Expand All @@ -17,8 +17,8 @@ public class VitessRankProviderTest {
public void shouldGetRankOneHost() {
String txId = "host1:1-4";
VitessRankProvider provider = new VitessRankProvider();
BigInteger rank = provider.getRank(txId);
assertThat(rank).isEqualTo(4);
BigDecimal rank = provider.getRank(txId);
assertThat(rank).isEqualTo(new BigDecimal(4));
}

}

0 comments on commit 7e95814

Please sign in to comment.