diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index 0210b3a3..7a7c2c72 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -105,7 +105,7 @@ private ReplicationMessageProcessor newReplicationMessageProcessor(VitessPartiti VitessTransactionInfo transactionInfo = new VitessTransactionInfo(message.getTransactionId(), message.getShard()); dispatcher.dispatchTransactionStartedEvent(partition, transactionInfo, offsetContext, message.getCommitTime()); } - else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) { + else { // When COMMIT event is received, all events have been processed except for this COMMIT event // We reset the VGTID such that current & restart VGTIDs are equal to this transaction's VGTID // We send one final event (transaction committed), the offset will only be committed if that event @@ -118,7 +118,6 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) { // Send a heartbeat event if time has elapsed dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } - return; } else if (message.getOperation() == ReplicationMessage.Operation.OTHER) { offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); diff --git a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java index 500079d7..30909074 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java @@ -18,6 +18,9 @@ public class TransactionalMessage implements ReplicationMessage { private final String shard; public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String keyspace, String shard) { + if (operation != Operation.BEGIN && operation != Operation.COMMIT) { + throw new IllegalArgumentException("TransactionalMessage can only have BEGIN or COMMIT operations"); + } this.transactionId = transactionId; this.commitTime = commitTime; this.operation = operation; diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 951d8ca9..40dec9f1 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -2169,7 +2169,8 @@ private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception { startConnector(Function.identity(), hasMultipleShards, offsetStoragePerTask, numTasks, gen, 1, null, ""); consumer = testConsumer(1); - executeAndWait("INSERT INTO pk_single_unique_key_table (id, int_col) VALUES (1, 1);", TEST_UNSHARDED_KEYSPACE); + executeAndWait("INSERT INTO pk_single_unique_key_table (id, int_col) VALUES (1, 1);", + TEST_UNSHARDED_KEYSPACE); SourceRecord record = consumer.remove(); Map prevOffset = record.sourceOffset(); diff --git a/src/test/java/io/debezium/connector/vitess/connection/TransactionalMessageTest.java b/src/test/java/io/debezium/connector/vitess/connection/TransactionalMessageTest.java new file mode 100644 index 00000000..832b0307 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/connection/TransactionalMessageTest.java @@ -0,0 +1,46 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.connection; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Instant; + +import org.junit.Test; + +import io.debezium.connector.vitess.TestHelper; + +/** + * @author Thomas Thornton + */ +public class TransactionalMessageTest { + + @Test + public void shouldInstantiateBeginMessage() { + ReplicationMessage message = new TransactionalMessage( + ReplicationMessage.Operation.BEGIN, "tx_id", Instant.now(), TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + assertThat(message.isTransactionalMessage()).isTrue(); + assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.BEGIN); + } + + @Test + public void shouldInstantiateCommitMessage() { + ReplicationMessage message = new TransactionalMessage( + ReplicationMessage.Operation.COMMIT, "tx_id", Instant.now(), TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + assertThat(message.isTransactionalMessage()).isTrue(); + assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.COMMIT); + } + + @Test + public void shouldThrowExceptionForNonTransactionalOperation() { + assertThatThrownBy(() -> { + new TransactionalMessage( + ReplicationMessage.Operation.INSERT, "tx_id", Instant.now(), TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("TransactionalMessage can only have BEGIN or COMMIT operations"); + } +}