Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7628 Use COMMIT time for events, not BEGIN time #184

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.vitess.connection;

import java.time.Instant;

import io.debezium.connector.vitess.Vgtid;

import binlogdata.Binlogdata;
Expand All @@ -14,4 +16,6 @@ public interface MessageDecoder {

void processMessage(Binlogdata.VEvent event, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
throws InterruptedException;

void setCommitTimestamp(Instant commitTimestamp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package io.debezium.connector.vitess.connection;

import static io.debezium.connector.vitess.connection.ReplicationMessage.Column;

import java.sql.Types;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,6 +49,10 @@ public VStreamOutputMessageDecoder(VitessDatabaseSchema schema) {
this.schema = schema;
}

public void setCommitTimestamp(Instant commitTimestamp) {
this.commitTimestamp = commitTimestamp;
}

@Override
public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
throws InterruptedException {
Expand Down Expand Up @@ -85,29 +87,29 @@ public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor

private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
}
processor.process(
new DdlMessage(transactionId, commitTimestamp), newVgtid, false);
new DdlMessage(transactionId, eventTimestamp), newVgtid, false);
}

private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
}
processor.process(
new OtherMessage(transactionId, commitTimestamp), newVgtid, false);
new OtherMessage(transactionId, eventTimestamp), newVgtid, false);
}

private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Use the entire VGTID as transaction id.
if (newVgtid != null) {
this.transactionId = newVgtid.toString();
Expand All @@ -117,23 +119,23 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc
LOGGER.info("Skip processing BEGIN because no VGTID was received");
return;
}
LOGGER.trace("Commit timestamp of begin transaction: {}", commitTimestamp);
LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp);
processor.process(
new TransactionalMessage(Operation.BEGIN, transactionId, commitTimestamp), newVgtid, false);
new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp), newVgtid, false);
}

private void handleCommitMessage(
Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
Instant commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
// Transaction ID must not be null in TransactionalMessage.
if (this.transactionId == null) {
LOGGER.info("Skip processing COMMIT because no VGTID was received");
return;
}
LOGGER.trace("Commit timestamp of commit transaction: {}", commitTimestamp);
LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp);
processor.process(
new TransactionalMessage(Operation.COMMIT, transactionId, commitTimestamp), newVgtid, false);
new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp), newVgtid, false);
}

private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package io.debezium.connector.vitess.connection;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -167,6 +168,7 @@ public void onNext(Vtgate.VStreamResponse response) {
return;
}
commitEventSeen = true;
messageDecoder.setCommitTimestamp(Instant.ofEpochSecond(event.getTimestamp()));
break;
case DDL:
case OTHER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.junit.Assert.fail;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -293,6 +294,113 @@ public void shouldHaveVgtidInResponse() throws Exception {
}
}

@Test
public void shouldSendCommitTimestamp() throws Exception {
// setup fixture
final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().build());
final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(
conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf));

AtomicReference<Throwable> error = new AtomicReference<>();
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) {
Vgtid startingVgtid = Vgtid.of(
Binlogdata.VGtid.newBuilder()
.addShardGtids(
Binlogdata.ShardGtid.newBuilder()
.setKeyspace(conf.getKeyspace())
.setShard(conf.getShard().get(0))
.setGtid(Vgtid.CURRENT_GTID)
.build())
.build());

BlockingQueue<MessageAndVgtid> consumedMessages = new ArrayBlockingQueue<>(100);
AtomicBoolean started = new AtomicBoolean(false);
connection.startStreaming(
startingVgtid,
(message, vgtid, isLastRowEventOfTransaction) -> {
if (!started.get()) {
started.set(true);
}
consumedMessages.add(new MessageAndVgtid(message, vgtid));
},
error);
// Since we are using the "current" as the starting position, there is a race here
// if we execute INSERT_STMT before the vstream starts we will never receive the update
// therefore, we wait until the stream is setup and then do the insertion
Awaitility
.await()
.atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
.until(started::get);
consumedMessages.clear();
int expectedNumOfMessages = 3;
MySQLConnection testConnection = MySQLConnection.forTestDatabase(TEST_UNSHARDED_KEYSPACE);
testConnection.setAutoCommit(false);
testConnection.executeWithoutCommitting("BEGIN");
Thread.sleep(1000);
testConnection.executeWithoutCommitting(TestHelper.INSERT_STMT);
Thread.sleep(1000);
testConnection.executeWithoutCommitting("COMMIT");
List<MessageAndVgtid> messages = awaitMessages(
TestHelper.waitTimeForRecords(),
SECONDS,
expectedNumOfMessages,
() -> {
try {
return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return null;
}
});

messages.forEach(m -> assertValidVgtid(m.getVgtid(), conf.getKeyspace(), conf.getShard().get(0)));
assertThat(messages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
Instant beginTime = messages.get(0).getMessage().getCommitTime();
assertThat(messages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
Instant insertTime = messages.get(1).getMessage().getCommitTime();
assertThat(messages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
Instant commitTime = messages.get(2).getMessage().getCommitTime();
assertThat(beginTime).isNotEqualTo(commitTime);
assertThat(insertTime).isEqualTo(commitTime);

testConnection.executeWithoutCommitting("BEGIN");
Thread.sleep(1000);
testConnection.executeWithoutCommitting(TestHelper.INSERT_STMT);
Thread.sleep(1000);
testConnection.executeWithoutCommitting("COMMIT");
List<MessageAndVgtid> messages2 = awaitMessages(
TestHelper.waitTimeForRecords(),
SECONDS,
expectedNumOfMessages,
() -> {
try {
return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return null;
}
});

messages2.forEach(m -> assertValidVgtid(m.getVgtid(), conf.getKeyspace(), conf.getShard().get(0)));
assertThat(messages2.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
Instant beginTime2 = messages2.get(0).getMessage().getCommitTime();
assertThat(messages2.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
Instant insertTime2 = messages2.get(1).getMessage().getCommitTime();
assertThat(messages2.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
Instant commitTime2 = messages2.get(2).getMessage().getCommitTime();
assertThat(beginTime2).isNotEqualTo(commitTime2);
assertThat(insertTime2).isEqualTo(commitTime2);

// Verify we use the commit time of the second transaction
assertThat(commitTime).isNotEqualTo(commitTime2);
}
finally {
if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
}
}
}

@Test
public void shouldCopyAndReplicate() throws Exception {
// setup fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.time.Instant;

import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -474,4 +476,110 @@ public void shouldProcessUpdateEvent() throws Exception {
assertThat(processed[0]).isTrue();
}

@Test
public void shouldSetRowEventsToCommitTimestamp() throws Exception {
// setup fixture
Long expectedBeginTimestamp = 1L;
Long expectedCommitTimestamp = 2L;
Binlogdata.VEvent beginEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.BEGIN)
.setTimestamp(expectedBeginTimestamp)
.build();
Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.COMMIT)
.setTimestamp(expectedCommitTimestamp)
.build();
decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp()));
decoder.processMessage(TestHelper.defaultFieldEvent(), null, null, false);
schema.tableFor(TestHelper.defaultTableId());
schema.tableFor(TestHelper.defaultTableId());
Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON);

// exercise SUT
decoder.processMessage(
beginEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedBeginTimestamp);
},
newVgtid,
false);
decoder.processMessage(
TestHelper.defaultInsertEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
decoder.processMessage(
TestHelper.defaultUpdateEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
decoder.processMessage(
TestHelper.defaultDeleteEvent(),
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
decoder.processMessage(
commitEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
newVgtid,
false);
}

@Test
public void shouldSetOtherEventsToEventTimestamp() throws Exception {
Long expectedEventTimestamp = 1L;
Long expectedCommitTimestamp = 2L;
Binlogdata.VEvent otherEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.OTHER)
.setTimestamp(expectedEventTimestamp)
.build();
Binlogdata.VEvent ddlEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.DDL)
.setTimestamp(expectedEventTimestamp)
.build();
Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.COMMIT)
.setTimestamp(expectedCommitTimestamp)
.build();
decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp()));
Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON);

decoder.processMessage(
otherEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedEventTimestamp);
},
newVgtid,
false);
decoder.processMessage(
ddlEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedEventTimestamp);
},
null,
false);
decoder.processMessage(
commitEvent,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp);
},
null,
false);
}
}