diff --git a/src/main/java/io/debezium/connector/vitess/connection/MessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/MessageDecoder.java index 8b08d3e6..77198c54 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/MessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/MessageDecoder.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.vitess.connection; +import java.time.Instant; + import io.debezium.connector.vitess.Vgtid; import binlogdata.Binlogdata; @@ -14,4 +16,6 @@ public interface MessageDecoder { void processMessage(Binlogdata.VEvent event, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) throws InterruptedException; + + void setCommitTimestamp(Instant commitTimestamp); } diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index 67462fb6..c29aa6e3 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -5,8 +5,6 @@ */ package io.debezium.connector.vitess.connection; -import static io.debezium.connector.vitess.connection.ReplicationMessage.Column; - import java.sql.Types; import java.time.Instant; import java.util.ArrayList; @@ -51,6 +49,10 @@ public VStreamOutputMessageDecoder(VitessDatabaseSchema schema) { this.schema = schema; } + public void setCommitTimestamp(Instant commitTimestamp) { + this.commitTimestamp = commitTimestamp; + } + @Override public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) throws InterruptedException { @@ -85,29 +87,29 @@ public void processMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException { - this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); + Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Use the entire VGTID as transaction id if (newVgtid != null) { this.transactionId = newVgtid.toString(); } processor.process( - new DdlMessage(transactionId, commitTimestamp), newVgtid, false); + new DdlMessage(transactionId, eventTimestamp), newVgtid, false); } private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException { - this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); + Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Use the entire VGTID as transaction id if (newVgtid != null) { this.transactionId = newVgtid.toString(); } processor.process( - new OtherMessage(transactionId, commitTimestamp), newVgtid, false); + new OtherMessage(transactionId, eventTimestamp), newVgtid, false); } private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException { - this.commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); + Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Use the entire VGTID as transaction id. if (newVgtid != null) { this.transactionId = newVgtid.toString(); @@ -117,23 +119,23 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc LOGGER.info("Skip processing BEGIN because no VGTID was received"); return; } - LOGGER.trace("Commit timestamp of begin transaction: {}", commitTimestamp); + LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp); processor.process( - new TransactionalMessage(Operation.BEGIN, transactionId, commitTimestamp), newVgtid, false); + new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp), newVgtid, false); } private void handleCommitMessage( Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException { - Instant commitTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); + Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp()); // Transaction ID must not be null in TransactionalMessage. if (this.transactionId == null) { LOGGER.info("Skip processing COMMIT because no VGTID was received"); return; } - LOGGER.trace("Commit timestamp of commit transaction: {}", commitTimestamp); + LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp); processor.process( - new TransactionalMessage(Operation.COMMIT, transactionId, commitTimestamp), newVgtid, false); + new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp), newVgtid, false); } private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 8f1a3eab..2755ec98 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.vitess.connection; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -167,6 +168,7 @@ public void onNext(Vtgate.VStreamResponse response) { return; } commitEventSeen = true; + messageDecoder.setCommitTimestamp(Instant.ofEpochSecond(event.getTimestamp())); break; case DDL: case OTHER: diff --git a/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java b/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java index 0b3e7dfc..034fa989 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessReplicationConnectionIT.java @@ -11,6 +11,7 @@ import static org.junit.Assert.fail; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -293,6 +294,113 @@ public void shouldHaveVgtidInResponse() throws Exception { } } + @Test + public void shouldSendCommitTimestamp() throws Exception { + // setup fixture + final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().build()); + final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema( + conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf)); + + AtomicReference error = new AtomicReference<>(); + try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) { + Vgtid startingVgtid = Vgtid.of( + Binlogdata.VGtid.newBuilder() + .addShardGtids( + Binlogdata.ShardGtid.newBuilder() + .setKeyspace(conf.getKeyspace()) + .setShard(conf.getShard().get(0)) + .setGtid(Vgtid.CURRENT_GTID) + .build()) + .build()); + + BlockingQueue consumedMessages = new ArrayBlockingQueue<>(100); + AtomicBoolean started = new AtomicBoolean(false); + connection.startStreaming( + startingVgtid, + (message, vgtid, isLastRowEventOfTransaction) -> { + if (!started.get()) { + started.set(true); + } + consumedMessages.add(new MessageAndVgtid(message, vgtid)); + }, + error); + // Since we are using the "current" as the starting position, there is a race here + // if we execute INSERT_STMT before the vstream starts we will never receive the update + // therefore, we wait until the stream is setup and then do the insertion + Awaitility + .await() + .atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())) + .until(started::get); + consumedMessages.clear(); + int expectedNumOfMessages = 3; + MySQLConnection testConnection = MySQLConnection.forTestDatabase(TEST_UNSHARDED_KEYSPACE); + testConnection.setAutoCommit(false); + testConnection.executeWithoutCommitting("BEGIN"); + Thread.sleep(1000); + testConnection.executeWithoutCommitting(TestHelper.INSERT_STMT); + Thread.sleep(1000); + testConnection.executeWithoutCommitting("COMMIT"); + List messages = awaitMessages( + TestHelper.waitTimeForRecords(), + SECONDS, + expectedNumOfMessages, + () -> { + try { + return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + return null; + } + }); + + messages.forEach(m -> assertValidVgtid(m.getVgtid(), conf.getKeyspace(), conf.getShard().get(0))); + assertThat(messages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN"); + Instant beginTime = messages.get(0).getMessage().getCommitTime(); + assertThat(messages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT"); + Instant insertTime = messages.get(1).getMessage().getCommitTime(); + assertThat(messages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT"); + Instant commitTime = messages.get(2).getMessage().getCommitTime(); + assertThat(beginTime).isNotEqualTo(commitTime); + assertThat(insertTime).isEqualTo(commitTime); + + testConnection.executeWithoutCommitting("BEGIN"); + Thread.sleep(1000); + testConnection.executeWithoutCommitting(TestHelper.INSERT_STMT); + Thread.sleep(1000); + testConnection.executeWithoutCommitting("COMMIT"); + List messages2 = awaitMessages( + TestHelper.waitTimeForRecords(), + SECONDS, + expectedNumOfMessages, + () -> { + try { + return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + return null; + } + }); + + messages2.forEach(m -> assertValidVgtid(m.getVgtid(), conf.getKeyspace(), conf.getShard().get(0))); + assertThat(messages2.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN"); + Instant beginTime2 = messages2.get(0).getMessage().getCommitTime(); + assertThat(messages2.get(1).getMessage().getOperation().name()).isEqualTo("INSERT"); + Instant insertTime2 = messages2.get(1).getMessage().getCommitTime(); + assertThat(messages2.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT"); + Instant commitTime2 = messages2.get(2).getMessage().getCommitTime(); + assertThat(beginTime2).isNotEqualTo(commitTime2); + assertThat(insertTime2).isEqualTo(commitTime2); + + // Verify we use the commit time of the second transaction + assertThat(commitTime).isNotEqualTo(commitTime2); + } + finally { + if (error.get() != null) { + LOGGER.error("Error during streaming", error.get()); + } + } + } + @Test public void shouldCopyAndReplicate() throws Exception { // setup fixture diff --git a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java index f278f270..da905db5 100644 --- a/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java +++ b/src/test/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoderTest.java @@ -8,6 +8,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.time.Instant; + import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -474,4 +476,110 @@ public void shouldProcessUpdateEvent() throws Exception { assertThat(processed[0]).isTrue(); } + @Test + public void shouldSetRowEventsToCommitTimestamp() throws Exception { + // setup fixture + Long expectedBeginTimestamp = 1L; + Long expectedCommitTimestamp = 2L; + Binlogdata.VEvent beginEvent = Binlogdata.VEvent.newBuilder() + .setType(Binlogdata.VEventType.BEGIN) + .setTimestamp(expectedBeginTimestamp) + .build(); + Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder() + .setType(Binlogdata.VEventType.COMMIT) + .setTimestamp(expectedCommitTimestamp) + .build(); + decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp())); + decoder.processMessage(TestHelper.defaultFieldEvent(), null, null, false); + schema.tableFor(TestHelper.defaultTableId()); + schema.tableFor(TestHelper.defaultTableId()); + Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON); + + // exercise SUT + decoder.processMessage( + beginEvent, + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedBeginTimestamp); + }, + newVgtid, + false); + decoder.processMessage( + TestHelper.defaultInsertEvent(), + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp); + }, + null, + false); + decoder.processMessage( + TestHelper.defaultUpdateEvent(), + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp); + }, + null, + false); + decoder.processMessage( + TestHelper.defaultDeleteEvent(), + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp); + }, + null, + false); + decoder.processMessage( + commitEvent, + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp); + }, + newVgtid, + false); + } + + @Test + public void shouldSetOtherEventsToEventTimestamp() throws Exception { + Long expectedEventTimestamp = 1L; + Long expectedCommitTimestamp = 2L; + Binlogdata.VEvent otherEvent = Binlogdata.VEvent.newBuilder() + .setType(Binlogdata.VEventType.OTHER) + .setTimestamp(expectedEventTimestamp) + .build(); + Binlogdata.VEvent ddlEvent = Binlogdata.VEvent.newBuilder() + .setType(Binlogdata.VEventType.DDL) + .setTimestamp(expectedEventTimestamp) + .build(); + Binlogdata.VEvent commitEvent = Binlogdata.VEvent.newBuilder() + .setType(Binlogdata.VEventType.COMMIT) + .setTimestamp(expectedCommitTimestamp) + .build(); + decoder.setCommitTimestamp(Instant.ofEpochSecond(commitEvent.getTimestamp())); + Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON); + + decoder.processMessage( + otherEvent, + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedEventTimestamp); + }, + newVgtid, + false); + decoder.processMessage( + ddlEvent, + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedEventTimestamp); + }, + null, + false); + decoder.processMessage( + commitEvent, + (message, vgtid, isLastRowEventOfTransaction) -> { + // verify outcome + assertThat(message.getCommitTime().getEpochSecond()).isEqualTo(expectedCommitTimestamp); + }, + null, + false); + } }