Skip to content

Commit

Permalink
DBZ-8594 Add validations that all transactional messages are only com…
Browse files Browse the repository at this point in the history
…mit or begin
  • Loading branch information
twthorn authored and jpechane committed Jan 30, 2025
1 parent aba083d commit ec75770
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private ReplicationMessageProcessor newReplicationMessageProcessor(VitessPartiti
VitessTransactionInfo transactionInfo = new VitessTransactionInfo(message.getTransactionId(), message.getShard());
dispatcher.dispatchTransactionStartedEvent(partition, transactionInfo, offsetContext, message.getCommitTime());
}
else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
else {
// When COMMIT event is received, all events have been processed except for this COMMIT event
// We reset the VGTID such that current & restart VGTIDs are equal to this transaction's VGTID
// We send one final event (transaction committed), the offset will only be committed if that event
Expand All @@ -118,7 +118,6 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
// Send a heartbeat event if time has elapsed
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
return;
}
else if (message.getOperation() == ReplicationMessage.Operation.OTHER) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public class TransactionalMessage implements ReplicationMessage {
private final String shard;

public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String keyspace, String shard) {
if (operation != Operation.BEGIN && operation != Operation.COMMIT) {
throw new IllegalArgumentException("TransactionalMessage can only have BEGIN or COMMIT operations");
}
this.transactionId = transactionId;
this.commitTime = commitTime;
this.operation = operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2169,7 +2169,8 @@ private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception {
startConnector(Function.identity(), hasMultipleShards, offsetStoragePerTask, numTasks, gen, 1, null, "");

consumer = testConsumer(1);
executeAndWait("INSERT INTO pk_single_unique_key_table (id, int_col) VALUES (1, 1);", TEST_UNSHARDED_KEYSPACE);
executeAndWait("INSERT INTO pk_single_unique_key_table (id, int_col) VALUES (1, 1);",
TEST_UNSHARDED_KEYSPACE);

SourceRecord record = consumer.remove();
Map<String, ?> prevOffset = record.sourceOffset();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.connection;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.time.Instant;

import org.junit.Test;

import io.debezium.connector.vitess.TestHelper;

/**
* @author Thomas Thornton
*/
public class TransactionalMessageTest {

@Test
public void shouldInstantiateBeginMessage() {
ReplicationMessage message = new TransactionalMessage(
ReplicationMessage.Operation.BEGIN, "tx_id", Instant.now(), TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
assertThat(message.isTransactionalMessage()).isTrue();
assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.BEGIN);
}

@Test
public void shouldInstantiateCommitMessage() {
ReplicationMessage message = new TransactionalMessage(
ReplicationMessage.Operation.COMMIT, "tx_id", Instant.now(), TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
assertThat(message.isTransactionalMessage()).isTrue();
assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.COMMIT);
}

@Test
public void shouldThrowExceptionForNonTransactionalOperation() {
assertThatThrownBy(() -> {
new TransactionalMessage(
ReplicationMessage.Operation.INSERT, "tx_id", Instant.now(), TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
}).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("TransactionalMessage can only have BEGIN or COMMIT operations");
}
}

0 comments on commit ec75770

Please sign in to comment.