Skip to content

Commit

Permalink
DBZ-7628 Use COMMIT time for events, not BEGIN time
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Mar 11, 2024
1 parent 8af2d88 commit e0aef30
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@

import binlogdata.Binlogdata;

import java.time.Instant;

/** Decode VStream gRPC VEvent and process it with the ReplicationMessageProcessor. */
public interface MessageDecoder {

void processMessage(Binlogdata.VEvent event, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
throws InterruptedException;

void setCommitTimestamp(Instant commitTimestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package io.debezium.connector.vitess.connection;

import static io.debezium.connector.vitess.connection.ReplicationMessage.Column;

import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,6 +49,10 @@ public VStreamOutputMessageDecoder(VitessDatabaseSchema schema) {
this.schema = schema;
}

public void setCommitTimestamp(Instant commitTimestamp) {
this.commitTimestamp = commitTimestamp;
}

@Override
public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
throws InterruptedException {
Expand Down Expand Up @@ -85,29 +87,29 @@ public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor

private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
}
processor.process(
new DdlMessage(transactionId, commitTimestamp), newVgtid, false);
new DdlMessage(transactionId, eventTimestamp), newVgtid, false);
}

private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
}
processor.process(
new OtherMessage(transactionId, commitTimestamp), newVgtid, false);
new OtherMessage(transactionId, eventTimestamp), newVgtid, false);
}

private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id.
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
Expand All @@ -117,23 +119,23 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc
LOGGER.info("Skip processing BEGIN because no VGTID was received");
return;
}
LOGGER.trace("Commit timestamp of begin transaction: {}", commitTimestamp);
LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp);
processor.process(
new TransactionalMessage(Operation.BEGIN, transactionId, commitTimestamp), newVgtid, false);
new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp), newVgtid, false);
}

private void handleCommitMessage(
Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
Instant commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Transaction ID must not be null in TransactionalMessage.
if (this.transactionId == null) {
LOGGER.info("Skip processing COMMIT because no VGTID was received");
return;
}
LOGGER.trace("Commit timestamp of commit transaction: {}", commitTimestamp);
LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp);
processor.process(
new TransactionalMessage(Operation.COMMIT, transactionId, commitTimestamp), newVgtid, false);
new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp), newVgtid, false);
}

private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.vitess.connection;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -167,6 +168,7 @@ public void onNext(Vtgate.VStreamResponse response) {
return;
}
commitEventSeen = true;
messageDecoder.setCommitTimestamp(Instant.ofEpochSecond(event.getTimestamp()));
break;
case DDL:
case OTHER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

import java.sql.Connection;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -293,6 +295,113 @@ public void shouldHaveVgtidInResponse() throws Exception {
}
}

@Test
public void shouldSendCommitTimestamp() throws Exception {
// setup fixture
final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().build());
final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(
conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf));

AtomicReference<Throwable> error = new AtomicReference<>();
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) {
Vgtid startingVgtid = Vgtid.of(
Binlogdata.VGtid.newBuilder()
.addShardGtids(
Binlogdata.ShardGtid.newBuilder()
.setKeyspace(conf.getKeyspace())
.setShard(conf.getShard().get(0))
.setGtid(Vgtid.CURRENT_GTID)
.build())
.build());

BlockingQueue<MessageAndVgtid> consumedMessages = new ArrayBlockingQueue<>(100);
AtomicBoolean started = new AtomicBoolean(false);
connection.startStreaming(
startingVgtid,
(message, vgtid, isLastRowEventOfTransaction) -> {
if (!started.get()) {
started.set(true);
}
consumedMessages.add(new MessageAndVgtid(message, vgtid));
},
error);
// Since we are using the "current" as the starting position, there is a race here
// if we execute INSERT_STMT before the vstream starts we will never receive the update
// therefore, we wait until the stream is setup and then do the insertion
Awaitility
.await()
.atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
.until(started::get);
consumedMessages.clear();
int expectedNumOfMessages = 3;
MySQLConnection testConnection = MySQLConnection.forTestDatabase(TEST_UNSHARDED_KEYSPACE);
testConnection.setAutoCommit(false);
testConnection.executeWithoutCommitting("BEGIN");
Thread.sleep(1000);
testConnection.executeWithoutCommitting(TestHelper.INSERT_STMT);
Thread.sleep(1000);
testConnection.executeWithoutCommitting("COMMIT");
List<MessageAndVgtid> messages = awaitMessages(
TestHelper.waitTimeForRecords(),
SECONDS,
expectedNumOfMessages,
() -> {
try {
return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return null;
}
});

messages.forEach(m -> assertValidVgtid(m.getVgtid(), conf.getKeyspace(), conf.getShard().get(0)));
assertThat(messages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
Instant beginTime = messages.get(0).getMessage().getCommitTime();
assertThat(messages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
Instant insertTime = messages.get(1).getMessage().getCommitTime();
assertThat(messages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
Instant commitTime = messages.get(2).getMessage().getCommitTime();
assertThat(beginTime).isNotEqualTo(commitTime);
assertThat(insertTime).isEqualTo(commitTime);

testConnection.executeWithoutCommitting("BEGIN");
Thread.sleep(1000);
testConnection.executeWithoutCommitting(TestHelper.INSERT_STMT);
Thread.sleep(1000);
testConnection.executeWithoutCommitting("COMMIT");
List<MessageAndVgtid> messages2 = awaitMessages(
TestHelper.waitTimeForRecords(),
SECONDS,
expectedNumOfMessages,
() -> {
try {
return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return null;
}
});

messages2.forEach(m -> assertValidVgtid(m.getVgtid(), conf.getKeyspace(), conf.getShard().get(0)));
assertThat(messages2.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
Instant beginTime2 = messages2.get(0).getMessage().getCommitTime();
assertThat(messages2.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
Instant insertTime2 = messages2.get(1).getMessage().getCommitTime();
assertThat(messages2.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
Instant commitTime2 = messages2.get(2).getMessage().getCommitTime();
assertThat(beginTime2).isNotEqualTo(commitTime2);
assertThat(insertTime2).isEqualTo(commitTime2);

// Verify we use the commit time of the second transaction
assertThat(commitTime).isNotEqualTo(commitTime2);
}
finally {
if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
}
}
}

@Test
public void shouldCopyAndReplicate() throws Exception {
// setup fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import binlogdata.Binlogdata;

import java.time.Instant;

public class VStreamOutputMessageDecoderTest {
private static final Logger LOGGER = LoggerFactory.getLogger(VStreamOutputMessageDecoderTest.class);

Expand Down Expand Up @@ -474,4 +476,112 @@ public void shouldProcessUpdateEvent() throws Exception {
assertThat(processed[0]).isTrue();
}

@Test
public void shouldSetRowEventsToCommitTimestamp() throws Exception {
// setup fixture
Long expectedBeginTimestamp = 1L;
Long expectedCommitTimestamp = 2L;
Binlogdata.VEvent beginEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.BEGIN)
.setTimestamp(expectedBeginTimestamp)
.build();
Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.COMMIT)
.setTimestamp(expectedCommitTimestamp)
.build();
decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp()));
decoder.processMessage(TestHelper.defaultFieldEvent(), null, null, false);
schema.tableFor(TestHelper.defaultTableId());
schema.tableFor(TestHelper.defaultTableId());
Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON);

// exercise SUT
// final boolean[] processed = { false };
decoder.processMessage(
beginEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedBeginTimestamp);
},
newVgtid,
false);
decoder.processMessage(
TestHelper.defaultInsertEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
decoder.processMessage(
TestHelper.defaultUpdateEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
decoder.processMessage(
TestHelper.defaultDeleteEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
decoder.processMessage(
commitEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
newVgtid,
false);
}

@Test
public void shouldSetOtherEventsToEventTimestamp() throws Exception {
// setup fixture
Long expectedEventTimestamp = 1L;
Long expectedCommitTimestamp = 2L;
Binlogdata.VEvent otherEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.OTHER)
.setTimestamp(expectedEventTimestamp)
.build();
Binlogdata.VEvent ddlEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.DDL)
.setTimestamp(expectedEventTimestamp)
.build();
Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.COMMIT)
.setTimestamp(expectedCommitTimestamp)
.build();
decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp()));
Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON);

decoder.processMessage(
otherEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedEventTimestamp);
},
newVgtid,
false);
decoder.processMessage(
ddlEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedEventTimestamp);
},
null,
false);
decoder.processMessage(
commitEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
}
}

0 comments on commit e0aef30

Please sign in to comment.