From 219ee85d48e82a3fc6aceed2df1c0db5a199e546 Mon Sep 17 00:00:00 2001 From: twthorn Date: Fri, 20 Dec 2024 11:53:42 -0800 Subject: [PATCH] DBZ-8541 Fix format --- .../connector/vitess/pipeline/txmetadata/Gtid.java | 1 - .../pipeline/txmetadata/VitessEpochProvider.java | 12 ++++++++---- .../pipeline/txmetadata/VitessEpochProviderTest.java | 5 ++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java index bf8c822b..808b710e 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/Gtid.java @@ -6,7 +6,6 @@ package io.debezium.connector.vitess.pipeline.txmetadata; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java index 1eee3c9d..6c8a039c 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -44,17 +44,20 @@ public static Long getEpochForGtid(Long previousEpoch, String previousGtidString if (isGtidOverridden(previousGtidString) && isGtidOverridden(gtidString)) { // GTID was overridden, and the current GTID is an overridden value, still waiting for first transaction return previousEpoch; - } else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) { + } + else if (isGtidOverridden(previousGtidString) && !isGtidOverridden(gtidString)) { // GTID was overridden, received first transaction, increment epoch LOGGER.info("Incrementing epoch: {}", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString)); return previousEpoch + 1; - } else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) { + } + else if (isStandardGtid(previousGtidString) && isGtidOverridden(gtidString)) { // previous GTID is standard, current GTID is overridden, should not be possible, raise exception String message = String.format("Current GTID cannot be override value if previous is standard: %s", getLogMessageForGtid(previousEpoch, previousGtidString, gtidString)); LOGGER.error(message); throw new DebeziumException(message); - } else { + } + else { // Both GTIDs are standard so parse them return getEpochForStandardGtid(previousEpoch, previousGtidString, gtidString); } @@ -65,7 +68,8 @@ private static Long getEpochForStandardGtid(Long previousEpoch, String previousG Gtid gtid = new Gtid(gtidString); if (gtid.isHostSetSupersetOf(previousGtid)) { return previousEpoch; - } else { + } + else { // Any other case (disjoint set, previous is a superset), VStream has interpreted the previous GTID correctly and sent some new GTID // in a continuous stream, so simply increment the epoch return previousEpoch + 1; diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java index a735d5f9..967132de 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProviderTest.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; -import org.assertj.core.api.Assertions; import org.junit.Test; import io.debezium.DebeziumException; @@ -28,7 +27,6 @@ import io.debezium.connector.vitess.Vgtid; import io.debezium.connector.vitess.VgtidTest; import io.debezium.connector.vitess.VitessConnectorConfig; -import io.debezium.junit.logging.LogInterceptor; public class VitessEpochProviderTest { @@ -226,7 +224,8 @@ public void missingEpochWithPreviousVgtidShouldThrowException() { public void testGtidPartialCurrent() { VitessEpochProvider provider = new VitessEpochProvider(); VitessConnectorConfig config = new VitessConnectorConfig(Configuration.empty()); - provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config); + provider.load(Map.of(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, + new ShardEpochMap(Map.of("f0-f8", 1L, "30-38", 1L, "b0-b8", 1L, "70-78", 1L)).toString()), config); String shard = "f0-f8"; String vgtidAllCurrent = "[" + "{\"keyspace\":\"keyspace1\",\"shard\":\"30-38\",\"gtid\":\"current\",\"table_p_ks\":[]}," +