Skip to content

Commit

Permalink
DBZ-7698 Refactor to generic transaction block customization
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Mar 29, 2024
1 parent dff6c1b commit 5d10208
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.vitess.connection.VitessTabletType;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionOrderMetadata;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.txmetadata.TransactionOrderMetadata;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
Expand Down Expand Up @@ -397,9 +395,6 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(VitessSourceInfoStructMaker.class.getName());

public static final Field TRANSACTION_ORDER_METADATA_FIELD = CommonConnectorConfig.TRANSACTION_ORDER_METADATA_FIELD
.withDefault(VitessTransactionOrderMetadata.class.getName());

public static final Field TRANSACTION_STRUCT_MAKER = CommonConnectorConfig.TRANSACTION_STRUCT_MAKER
.withDefault(VitessOrderedTransactionStructMaker.class.getName());

Expand Down Expand Up @@ -489,11 +484,6 @@ protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this);
}

@Override
public TransactionOrderMetadata getTransactionOrderMetadata() {
return getTransactionOrderMetadata(TRANSACTION_ORDER_METADATA_FIELD);
}

@Override
public TransactionStructMaker getTransactionStructMaker() {
return getTransactionStructMaker(TRANSACTION_STRUCT_MAKER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ public String getTransactionId(
return sourceInfo.getString(SourceInfo.VGTID_KEY);
}

// @Override
// public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
//
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionContextSupplier;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
Expand Down Expand Up @@ -52,8 +51,7 @@ public static VitessOffsetContext initialContext(
LOGGER.info("No previous offset exists. Use default VGTID.");
final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig);
// use the other transaction context
TransactionContextSupplier supplier = new TransactionContextSupplier(connectorConfig);
TransactionContext transactionContext = supplier.newTransactionContext();
TransactionContext transactionContext = connectorConfig.getTransactionContext();
VitessOffsetContext context = new VitessOffsetContext(
connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext);
return context;
Expand Down Expand Up @@ -149,8 +147,7 @@ public Loader(VitessConnectorConfig connectorConfig) {
@Override
public VitessOffsetContext load(Map<String, ?> offset) {
final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY);
TransactionContextSupplier supplier = new TransactionContextSupplier(connectorConfig);
TransactionContext transactionContext = supplier.loadTransactionContext(offset);
TransactionContext transactionContext = connectorConfig.getTransactionContext();
return new VitessOffsetContext(
connectorConfig,
Vgtid.of(vgtid),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionOrderMetadata;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext;
import io.debezium.data.Envelope;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.schema.SchemaFactory;
Expand All @@ -35,8 +35,8 @@ public Schema getOrderedTransactionBlockSchema() {
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA)
.field(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA)
.field(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA)
.field(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA)
.field(VitessTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ else if (gtid.isHostSetSubsetOf(previousGtid)) {

public Map<String, Object> store(Map<String, Object> offset) {
try {
offset.put(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch));
offset.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch));
return offset;
}
catch (JsonProcessingException e) {
Expand All @@ -52,7 +52,7 @@ public Map<String, Object> store(Map<String, Object> offset) {

public void load(Map<String, ?> offsets) {
try {
String shardToEpochString = (String) offsets.get(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH);
String shardToEpochString = (String) offsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH);
if (shardToEpochString != null) {
shardToEpoch = MAPPER.readValue(shardToEpochString, new TypeReference<Map<String, Long>>() {
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,25 @@
import io.debezium.connector.vitess.VitessSchemaFactory;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.AbstractTransactionStructMaker;
import io.debezium.pipeline.txmetadata.OrderedTransactionContext;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.spi.schema.DataCollectionId;

public class VitessOrderedTransactionStructMaker extends AbstractTransactionStructMaker implements TransactionStructMaker {

@Override
public Struct prepareTxStruct(OffsetContext offsetContext, DataCollectionId source) {
Struct struct = super.prepareTxStruct(offsetContext, source);
public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) {
Struct struct = super.prepareTxStruct(offsetContext, dataCollectionEventOrder, value);
return addOrderMetadata(struct, offsetContext);
}

private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) {
VitessTransactionOrderMetadata metadata = getVitessTransactionOrderMetadata(offsetContext);
struct.put(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_RANK, metadata.transactionRank.toString());
struct.put(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, metadata.transactionEpoch);
VitessTransactionContext context = getVitessTransactionOrderMetadata(offsetContext);
struct.put(VitessTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString());
struct.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch);
return struct;
}

private VitessTransactionOrderMetadata getVitessTransactionOrderMetadata(OffsetContext offsetContext) {
OrderedTransactionContext context = (OrderedTransactionContext) offsetContext.getTransactionContext();
return (VitessTransactionOrderMetadata) context.getTransactionOrderMetadata();
private VitessTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) {
return (VitessTransactionContext) offsetContext.getTransactionContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,51 @@
import io.debezium.connector.vitess.Vgtid;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionOrderMetadata;

public class VitessTransactionOrderMetadata implements TransactionOrderMetadata {

public class VitessTransactionContext extends TransactionContext {
public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch";
public static final String OFFSET_TRANSACTION_RANK = "transaction_rank";
protected String previousTransactionId = null;
protected Long transactionEpoch;
protected BigInteger transactionRank;
protected Long transactionEpoch = 0L;
protected BigInteger transactionRank = null;
private VitessEpochProvider epochProvider = new VitessEpochProvider();
private VitessRankProvider rankProvider = new VitessRankProvider();

public VitessTransactionContext() {
}

public VitessTransactionContext(TransactionContext transactionContext) {
super();
// Copy fields
this.transactionId = transactionContext.transactionId;
this.perTableEventCount.putAll(transactionContext.perTableEventCount);
this.totalEventCount = transactionContext.totalEventCount;
}

@Override
public Map<String, Object> store(Map<String, Object> offset) {
offset = super.store(offset);
return epochProvider.store(offset);
}

@Override
public void load(Map<String, ?> offsets) {
this.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID);
epochProvider.load(offsets);
public static VitessTransactionContext load(Map<String, ?> offsets) {
TransactionContext transactionContext = TransactionContext.load(offsets);
VitessTransactionContext vitessTransactionContext = new VitessTransactionContext(transactionContext);
vitessTransactionContext.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID);
vitessTransactionContext.epochProvider.load(offsets);
return vitessTransactionContext;
}

@Override
public void beginTransaction(TransactionInfo transactionInfo) {
super.beginTransaction(transactionInfo);
VitessTransactionInfo vitessTransactionInfo = (VitessTransactionInfo) transactionInfo;
beginTransaction(vitessTransactionInfo.getShard(), vitessTransactionInfo.getTransactionId());
}

@Override
public void endTransaction() {
super.endTransaction();
this.transactionEpoch = null;
this.transactionRank = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -421,7 +423,8 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true)
.with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class)
.with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(VitessConnectorConfig.SHARD, "-80,80-"),
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.junit.Before;
import org.junit.Test;

import io.debezium.pipeline.txmetadata.OrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
Expand Down Expand Up @@ -103,23 +103,23 @@ public void shouldResetToNewVGgtid() {
public void shouldGetOrderedTransactionContext() {
VitessConnectorConfig config = new VitessConnectorConfig(
TestHelper.defaultConfig()
.with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true)
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class)
.build());
VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config);
Map offsets = Map.of(SourceInfo.VGTID_KEY, VGTID_JSON);
VitessOffsetContext context = loader.load(offsets);
TransactionContext transactionContext = context.getTransactionContext();
assertThat(transactionContext).isInstanceOf(OrderedTransactionContext.class);
assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class);
}

@Test
public void shouldGetInitialOrderedTransactionContext() {
VitessConnectorConfig config = new VitessConnectorConfig(
TestHelper.defaultConfig()
.with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true)
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class)
.build());
VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system());
TransactionContext transactionContext = context.getTransactionContext();
assertThat(transactionContext).isInstanceOf(OrderedTransactionContext.class);
assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,53 @@
*/
package io.debezium.connector.vitess.pipeline.txmetadata;

import io.debezium.connector.vitess.SourceInfo;
import io.debezium.pipeline.txmetadata.OrderedTransactionContext;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;

import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Test;

public class VitessTransactionOrderMetadataTest {
import io.debezium.connector.vitess.SourceInfo;

public class VitessTransactionContextTest {

private static final Schema sourceStructSchema = SchemaBuilder.struct().field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA);

@Test
public void shouldInit() {
new VitessTransactionOrderMetadata();
new VitessTransactionContext();
}

@Test
public void shouldLoad() {
String expectedId = "foo";
String expectedId = null;
String expectedEpoch = "{\"-80\": 0}";
Map offsets = Map.of(
OrderedTransactionContext.OFFSET_TRANSACTION_ID, expectedId,
VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, expectedEpoch);
VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata();
VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch);
VitessTransactionContext metadata = new VitessTransactionContext();
metadata.load(offsets);
assertThat(metadata.previousTransactionId).isEqualTo(expectedId);
}

@Test
public void shouldLoadWithNull() {
String expectedId = null;
Long expectedEpoch = null;
Long expectedEpoch = 0L;
Map offsets = Collections.emptyMap();
VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata();
VitessTransactionContext metadata = new VitessTransactionContext();
metadata.load(offsets);
assertThat(metadata.previousTransactionId).isEqualTo(expectedId);
assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch);
}

@Test
public void shouldUpdateEpoch() {
VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata();
VitessTransactionContext metadata = new VitessTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
BigInteger expectedRank = new BigInteger("7");
Expand All @@ -76,7 +75,7 @@ public void shouldUpdateEpoch() {

@Test
public void shouldUpdateRank() {
VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata();
VitessTransactionContext metadata = new VitessTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
String expectedShard = "-80";
Expand All @@ -93,7 +92,7 @@ public void shouldUpdateRank() {

@Test
public void shouldStoreOffsets() {
VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata();
VitessTransactionContext metadata = new VitessTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
String expectedShard = "-80";
Expand All @@ -104,6 +103,6 @@ public void shouldStoreOffsets() {
Map offsets = new HashMap();
String expectedEpoch = "{\"-80\":0}";
Map actualOffsets = metadata.store(offsets);
assertThat(actualOffsets.get(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch);
assertThat(actualOffsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch);
}
}

0 comments on commit 5d10208

Please sign in to comment.