diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 49604274..da3e9c6d 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -32,9 +32,7 @@ import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.vitess.connection.VitessTabletType; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker; -import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionOrderMetadata; import io.debezium.jdbc.JdbcConfiguration; -import io.debezium.pipeline.txmetadata.TransactionOrderMetadata; import io.debezium.pipeline.txmetadata.TransactionStructMaker; import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.RelationalDatabaseConnectorConfig; @@ -397,9 +395,6 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(VitessSourceInfoStructMaker.class.getName()); - public static final Field TRANSACTION_ORDER_METADATA_FIELD = CommonConnectorConfig.TRANSACTION_ORDER_METADATA_FIELD - .withDefault(VitessTransactionOrderMetadata.class.getName()); - public static final Field TRANSACTION_STRUCT_MAKER = CommonConnectorConfig.TRANSACTION_STRUCT_MAKER .withDefault(VitessOrderedTransactionStructMaker.class.getName()); @@ -489,11 +484,6 @@ protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this); } - @Override - public TransactionOrderMetadata getTransactionOrderMetadata() { - return getTransactionOrderMetadata(TRANSACTION_ORDER_METADATA_FIELD); - } - @Override public TransactionStructMaker getTransactionStructMaker() { return getTransactionStructMaker(TRANSACTION_STRUCT_MAKER); diff --git a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java index 91757e40..6f996551 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java +++ b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java @@ -61,4 +61,9 @@ public String getTransactionId( return sourceInfo.getString(SourceInfo.VGTID_KEY); } + // @Override + // public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) { + // + // } + } diff --git a/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java b/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java index 4797699e..057e4481 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java +++ b/src/main/java/io/debezium/connector/vitess/VitessOffsetContext.java @@ -19,7 +19,6 @@ import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; -import io.debezium.pipeline.txmetadata.TransactionContextSupplier; import io.debezium.relational.TableId; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; @@ -52,8 +51,7 @@ public static VitessOffsetContext initialContext( LOGGER.info("No previous offset exists. Use default VGTID."); final Vgtid defaultVgtid = VitessReplicationConnection.defaultVgtid(connectorConfig); // use the other transaction context - TransactionContextSupplier supplier = new TransactionContextSupplier(connectorConfig); - TransactionContext transactionContext = supplier.newTransactionContext(); + TransactionContext transactionContext = connectorConfig.getTransactionContext(); VitessOffsetContext context = new VitessOffsetContext( connectorConfig, defaultVgtid, clock.currentTimeAsInstant(), transactionContext); return context; @@ -149,8 +147,7 @@ public Loader(VitessConnectorConfig connectorConfig) { @Override public VitessOffsetContext load(Map offset) { final String vgtid = (String) offset.get(SourceInfo.VGTID_KEY); - TransactionContextSupplier supplier = new TransactionContextSupplier(connectorConfig); - TransactionContext transactionContext = supplier.loadTransactionContext(offset); + TransactionContext transactionContext = connectorConfig.getTransactionContext(); return new VitessOffsetContext( connectorConfig, Vgtid.of(vgtid), diff --git a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java index 1e80de2c..c2bceb18 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java @@ -11,7 +11,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; -import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionOrderMetadata; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext; import io.debezium.data.Envelope; import io.debezium.pipeline.txmetadata.TransactionStructMaker; import io.debezium.schema.SchemaFactory; @@ -35,8 +35,8 @@ public Schema getOrderedTransactionBlockSchema() { .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA) .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA) .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA) - .field(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA) - .field(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA) + .field(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA) + .field(VitessTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA) .build(); } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java index 7b722d27..b7d27103 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -42,7 +42,7 @@ else if (gtid.isHostSetSubsetOf(previousGtid)) { public Map store(Map offset) { try { - offset.put(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch)); + offset.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch)); return offset; } catch (JsonProcessingException e) { @@ -52,7 +52,7 @@ public Map store(Map offset) { public void load(Map offsets) { try { - String shardToEpochString = (String) offsets.get(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH); + String shardToEpochString = (String) offsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH); if (shardToEpochString != null) { shardToEpoch = MAPPER.readValue(shardToEpochString, new TypeReference>() { }); diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java index 67b58e13..c10e7d33 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java @@ -11,28 +11,25 @@ import io.debezium.connector.vitess.VitessSchemaFactory; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.AbstractTransactionStructMaker; -import io.debezium.pipeline.txmetadata.OrderedTransactionContext; import io.debezium.pipeline.txmetadata.TransactionStructMaker; -import io.debezium.spi.schema.DataCollectionId; public class VitessOrderedTransactionStructMaker extends AbstractTransactionStructMaker implements TransactionStructMaker { @Override - public Struct prepareTxStruct(OffsetContext offsetContext, DataCollectionId source) { - Struct struct = super.prepareTxStruct(offsetContext, source); + public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEventOrder, Struct value) { + Struct struct = super.prepareTxStruct(offsetContext, dataCollectionEventOrder, value); return addOrderMetadata(struct, offsetContext); } private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) { - VitessTransactionOrderMetadata metadata = getVitessTransactionOrderMetadata(offsetContext); - struct.put(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_RANK, metadata.transactionRank.toString()); - struct.put(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, metadata.transactionEpoch); + VitessTransactionContext context = getVitessTransactionOrderMetadata(offsetContext); + struct.put(VitessTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString()); + struct.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch); return struct; } - private VitessTransactionOrderMetadata getVitessTransactionOrderMetadata(OffsetContext offsetContext) { - OrderedTransactionContext context = (OrderedTransactionContext) offsetContext.getTransactionContext(); - return (VitessTransactionOrderMetadata) context.getTransactionOrderMetadata(); + private VitessTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) { + return (VitessTransactionContext) offsetContext.getTransactionContext(); } @Override diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionOrderMetadata.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContext.java similarity index 59% rename from src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionOrderMetadata.java rename to src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContext.java index b3d84aa8..2cced919 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionOrderMetadata.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContext.java @@ -11,37 +11,51 @@ import io.debezium.connector.vitess.Vgtid; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionInfo; -import io.debezium.pipeline.txmetadata.TransactionOrderMetadata; - -public class VitessTransactionOrderMetadata implements TransactionOrderMetadata { +public class VitessTransactionContext extends TransactionContext { public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch"; public static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; protected String previousTransactionId = null; - protected Long transactionEpoch; - protected BigInteger transactionRank; + protected Long transactionEpoch = 0L; + protected BigInteger transactionRank = null; private VitessEpochProvider epochProvider = new VitessEpochProvider(); private VitessRankProvider rankProvider = new VitessRankProvider(); + public VitessTransactionContext() { + } + + public VitessTransactionContext(TransactionContext transactionContext) { + super(); + // Copy fields + this.transactionId = transactionContext.transactionId; + this.perTableEventCount.putAll(transactionContext.perTableEventCount); + this.totalEventCount = transactionContext.totalEventCount; + } + @Override public Map store(Map offset) { + offset = super.store(offset); return epochProvider.store(offset); } - @Override - public void load(Map offsets) { - this.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID); - epochProvider.load(offsets); + public static VitessTransactionContext load(Map offsets) { + TransactionContext transactionContext = TransactionContext.load(offsets); + VitessTransactionContext vitessTransactionContext = new VitessTransactionContext(transactionContext); + vitessTransactionContext.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID); + vitessTransactionContext.epochProvider.load(offsets); + return vitessTransactionContext; } @Override public void beginTransaction(TransactionInfo transactionInfo) { + super.beginTransaction(transactionInfo); VitessTransactionInfo vitessTransactionInfo = (VitessTransactionInfo) transactionInfo; beginTransaction(vitessTransactionInfo.getShard(), vitessTransactionInfo.getTransactionId()); } @Override public void endTransaction() { + super.endTransaction(); this.transactionEpoch = null; this.transactionRank = null; } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index e4c9a775..b184bc7b 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -46,6 +46,8 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.connector.vitess.connection.VitessReplicationConnection; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext; import io.debezium.converters.CloudEventsConverterTest; import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.data.Envelope; @@ -421,7 +423,8 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception { TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); TestHelper.applyVSchema("vitess_vschema.json"); startConnector(config -> config - .with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true) + .with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class) + .with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class) .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(VitessConnectorConfig.SHARD, "-80,80-"), true, diff --git a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java index 27629a47..6adeb531 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java @@ -13,7 +13,7 @@ import org.junit.Before; import org.junit.Test; -import io.debezium.pipeline.txmetadata.OrderedTransactionContext; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.util.Clock; import io.debezium.util.Collect; @@ -103,23 +103,23 @@ public void shouldResetToNewVGgtid() { public void shouldGetOrderedTransactionContext() { VitessConnectorConfig config = new VitessConnectorConfig( TestHelper.defaultConfig() - .with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true) + .with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class) .build()); VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config); Map offsets = Map.of(SourceInfo.VGTID_KEY, VGTID_JSON); VitessOffsetContext context = loader.load(offsets); TransactionContext transactionContext = context.getTransactionContext(); - assertThat(transactionContext).isInstanceOf(OrderedTransactionContext.class); + assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class); } @Test public void shouldGetInitialOrderedTransactionContext() { VitessConnectorConfig config = new VitessConnectorConfig( TestHelper.defaultConfig() - .with(VitessConnectorConfig.PROVIDE_ORDERED_TRANSACTION_METADATA, true) + .with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class) .build()); VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system()); TransactionContext transactionContext = context.getTransactionContext(); - assertThat(transactionContext).isInstanceOf(OrderedTransactionContext.class); + assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class); } } diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionOrderMetadataTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContextTest.java similarity index 79% rename from src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionOrderMetadataTest.java rename to src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContextTest.java index c4e75292..69977ad5 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionOrderMetadataTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContextTest.java @@ -5,36 +5,35 @@ */ package io.debezium.connector.vitess.pipeline.txmetadata; -import io.debezium.connector.vitess.SourceInfo; -import io.debezium.pipeline.txmetadata.OrderedTransactionContext; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.junit.Test; +import static org.assertj.core.api.Assertions.assertThat; import java.math.BigInteger; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import static org.assertj.core.api.Assertions.assertThat; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.Test; -public class VitessTransactionOrderMetadataTest { +import io.debezium.connector.vitess.SourceInfo; + +public class VitessTransactionContextTest { private static final Schema sourceStructSchema = SchemaBuilder.struct().field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA); @Test public void shouldInit() { - new VitessTransactionOrderMetadata(); + new VitessTransactionContext(); } @Test public void shouldLoad() { - String expectedId = "foo"; + String expectedId = null; String expectedEpoch = "{\"-80\": 0}"; Map offsets = Map.of( - OrderedTransactionContext.OFFSET_TRANSACTION_ID, expectedId, - VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH, expectedEpoch); - VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata(); + VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch); + VitessTransactionContext metadata = new VitessTransactionContext(); metadata.load(offsets); assertThat(metadata.previousTransactionId).isEqualTo(expectedId); } @@ -42,9 +41,9 @@ public void shouldLoad() { @Test public void shouldLoadWithNull() { String expectedId = null; - Long expectedEpoch = null; + Long expectedEpoch = 0L; Map offsets = Collections.emptyMap(); - VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata(); + VitessTransactionContext metadata = new VitessTransactionContext(); metadata.load(offsets); assertThat(metadata.previousTransactionId).isEqualTo(expectedId); assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch); @@ -52,7 +51,7 @@ public void shouldLoadWithNull() { @Test public void shouldUpdateEpoch() { - VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata(); + VitessTransactionContext metadata = new VitessTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; BigInteger expectedRank = new BigInteger("7"); @@ -76,7 +75,7 @@ public void shouldUpdateEpoch() { @Test public void shouldUpdateRank() { - VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata(); + VitessTransactionContext metadata = new VitessTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; String expectedShard = "-80"; @@ -93,7 +92,7 @@ public void shouldUpdateRank() { @Test public void shouldStoreOffsets() { - VitessTransactionOrderMetadata metadata = new VitessTransactionOrderMetadata(); + VitessTransactionContext metadata = new VitessTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; String expectedShard = "-80"; @@ -104,6 +103,6 @@ public void shouldStoreOffsets() { Map offsets = new HashMap(); String expectedEpoch = "{\"-80\":0}"; Map actualOffsets = metadata.store(offsets); - assertThat(actualOffsets.get(VitessTransactionOrderMetadata.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch); + assertThat(actualOffsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch); } }