diff --git a/src/main/java/io/debezium/connector/vitess/OffsetValueType.java b/src/main/java/io/debezium/connector/vitess/OffsetValueType.java new file mode 100644 index 00000000..cc9eb5f0 --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/OffsetValueType.java @@ -0,0 +1,116 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; + +public enum OffsetValueType { + + GTID(SourceInfo.VGTID_KEY, OffsetValueType::parseGtid, + OffsetValueType::getVgtid, OffsetValueType::getConfigGtidsPerShard), + EPOCH(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, OffsetValueType::parseEpoch, + OffsetValueType::getShardEpochMap, OffsetValueType::getConfigShardEpochMapPerShard); + + public final String name; + public final Function> parserFunction; + public final BiFunction, String, Object> conversionFunction; + public final BiFunction, Map> configValuesFunction; + + OffsetValueType(String typeName, Function> parserFunction, + BiFunction, String, Object> conversionFunction, + BiFunction, Map> configValuesFunction) { + this.name = typeName; + this.parserFunction = parserFunction; + this.conversionFunction = conversionFunction; + this.configValuesFunction = configValuesFunction; + } + + private static Map parseGtid(String vgtidStr) { + Map shardToGtid = new HashMap<>(); + List shardGtids = Vgtid.of(vgtidStr).getShardGtids(); + for (Vgtid.ShardGtid shardGtid : shardGtids) { + shardToGtid.put(shardGtid.getShard(), shardGtid.getGtid()); + } + return shardToGtid; + } + + private static Map parseEpoch(String epochString) { + ShardEpochMap shardToEpoch = ShardEpochMap.of(epochString); + return (Map) shardToEpoch.getMap(); + } + + /** + * Get the {@link ShardEpochMap} from this map of shards to epochs. + * + * @param epochMap Map of shards to epoch values + * @param keyspace Needed to match the function signature of getVgtid, ignored + * @return The {@link ShardEpochMap} + */ + static ShardEpochMap getShardEpochMap(Map epochMap, String keyspace) { + return new ShardEpochMap((Map) epochMap); + } + + static Vgtid getVgtid(Map gtidsPerShard, String keyspace) { + List shardGtids = new ArrayList(); + for (Map.Entry entry : gtidsPerShard.entrySet()) { + shardGtids.add(new Vgtid.ShardGtid(keyspace, entry.getKey(), (String) entry.getValue())); + } + return Vgtid.of(shardGtids); + } + + static Map getConfigShardEpochMapPerShard(VitessConnectorConfig connectorConfig, List shards) { + String shardEpochMapString = connectorConfig.getShardEpochMap(); + Function initEpoch = x -> 0L; + Map shardEpochMap; + if (shardEpochMapString.isEmpty()) { + shardEpochMap = buildMap(shards, initEpoch); + } + else { + shardEpochMap = ShardEpochMap.of(shardEpochMapString).getMap(); + } + return (Map) shardEpochMap; + } + + static Map getConfigGtidsPerShard(VitessConnectorConfig connectorConfig, List shards) { + String gtids = connectorConfig.getVgtid(); + Map configGtidsPerShard = null; + if (shards != null && gtids.equals(Vgtid.EMPTY_GTID)) { + Function emptyGtid = x -> Vgtid.EMPTY_GTID; + configGtidsPerShard = buildMap(shards, emptyGtid); + } + else if (shards != null && gtids.equals(Vgtid.CURRENT_GTID)) { + Function currentGtid = x -> Vgtid.CURRENT_GTID; + configGtidsPerShard = buildMap(shards, currentGtid); + } + else if (shards != null) { + List shardGtids = Vgtid.of(gtids).getShardGtids(); + Map shardsToGtid = new HashMap<>(); + for (Vgtid.ShardGtid shardGtid : shardGtids) { + shardsToGtid.put(shardGtid.getShard(), shardGtid.getGtid()); + } + Function shardGtid = (i -> shardsToGtid.get(shards.get(i))); + configGtidsPerShard = buildMap(shards, shardGtid); + } + return (Map) configGtidsPerShard; + } + + private static Map buildMap(List keys, Function function) { + return IntStream.range(0, keys.size()) + .boxed() + .collect(Collectors.toMap(keys::get, function)); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnector.java b/src/main/java/io/debezium/connector/vitess/VitessConnector.java index 37ca08cd..8043aedb 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnector.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnector.java @@ -106,14 +106,14 @@ public List> taskConfigs(int maxTasks, List currentS connectorConfig, tasks, gen, false, context().offsetStorageReader()); Map gtidsPerShard = currentGen.getGtidPerShard(); - validateCurrentGen(currentGen, gtidsPerShard, currentShards, VitessOffsetRetriever.ValueType.GTID); + validateCurrentGen(currentGen, gtidsPerShard, currentShards, OffsetValueType.GTID); List shards = determineShards(prevGtidsPerShard, gtidsPerShard, currentShards); if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) { Map prevEpochsPerShard = previousGen.getEpochPerShard(); validateNoLostShardData(prevEpochsPerShard, currentShards, "epochs"); Map epochsPerShard = currentGen.getEpochPerShard(); - validateCurrentGen(currentGen, epochsPerShard, currentShards, VitessOffsetRetriever.ValueType.EPOCH); + validateCurrentGen(currentGen, epochsPerShard, currentShards, OffsetValueType.EPOCH); List shardsFromEpoch = determineShards(prevEpochsPerShard, epochsPerShard, currentShards); if (!shardsFromEpoch.equals(shards)) { throw new IllegalArgumentException(String.format( @@ -180,7 +180,7 @@ private static void validateGeneration(Map prevGtidsPerShard, in } private Map validateCurrentGen(VitessOffsetRetriever retriever, Map valuePerShard, List currentShards, - VitessOffsetRetriever.ValueType valueType) { + OffsetValueType valueType) { if (valuePerShard != null && !hasSameShards(valuePerShard.keySet(), currentShards)) { LOGGER.warn("Some shards {} for the current generation {} are not persisted. Expected shards: {}", valueType.name(), valuePerShard.keySet(), currentShards); diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java index c228374a..64609f91 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java @@ -5,20 +5,15 @@ */ package io.debezium.connector.vitess; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.DebeziumException; import io.debezium.annotation.VisibleForTesting; import io.debezium.bean.StandardBeanNames; import io.debezium.config.CommonConnectorConfig; @@ -29,7 +24,6 @@ import io.debezium.connector.vitess.connection.ReplicationConnection; import io.debezium.connector.vitess.connection.VitessReplicationConnection; import io.debezium.connector.vitess.metrics.VitessChangeEventSourceMetricsFactory; -import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.ErrorHandler; @@ -154,17 +148,17 @@ protected ChangeEventSourceCoordinator sta public Configuration getConfigWithOffsets(Configuration config) { VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config); if (connectorConfig.offsetStoragePerTask()) { - Object vgtid = getVitessTaskValuePerShard(connectorConfig, VitessOffsetRetriever.ValueType.GTID); + Object vgtid = getVitessTaskValuePerShard(connectorConfig, OffsetValueType.GTID); config = config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, vgtid).build(); if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) { - Object shardEpochMap = getVitessTaskValuePerShard(connectorConfig, VitessOffsetRetriever.ValueType.EPOCH); + Object shardEpochMap = getVitessTaskValuePerShard(connectorConfig, OffsetValueType.EPOCH); config = config.edit().with(VitessConnectorConfig.VITESS_TASK_SHARD_EPOCH_MAP_CONFIG, shardEpochMap).build(); } } return config; } - private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig, VitessOffsetRetriever.ValueType valueType) { + private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig, OffsetValueType valueType) { int gen = connectorConfig.getOffsetStorageTaskKeyGen(); int prevGen = gen - 1; VitessOffsetRetriever prevGenRetriever = new VitessOffsetRetriever( @@ -184,15 +178,7 @@ private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig, Map curGenValuesPerShard = retriever.getValuePerShardFromStorage(valueType); LOGGER.info("{} per shard {}", valueType.name(), curGenValuesPerShard); List shards = connectorConfig.getVitessTaskKeyShards(); - Map configValuesPerShard = null; - switch (valueType) { - case GTID: - configValuesPerShard = getConfigGtidsPerShard(connectorConfig, shards); - break; - case EPOCH: - configValuesPerShard = getConfigShardEpochMapPerShard(connectorConfig, shards); - break; - } + Map configValuesPerShard = valueType.configValuesFunction.apply(connectorConfig, shards); LOGGER.info("config {} per shard {}", valueType.name(), configValuesPerShard); final String keyspace = connectorConfig.getKeyspace(); @@ -213,71 +199,7 @@ else if (prevGenValuesPerShard != null && prevGenValuesPerShard.containsKey(shar } valuesPerShard.put(shard, value); } - switch (valueType) { - case GTID: - Map gtidsPerShard = (Map) valuesPerShard; - return getVgtid(gtidsPerShard, keyspace); - case EPOCH: - Map epochsPerShard = (Map) valuesPerShard; - return getShardEpochMap(epochsPerShard); - default: - throw new DebeziumException(String.format("Unknown value type %s", valueType.name())); - } - } - - private ShardEpochMap getShardEpochMap(Map epochMap) { - return new ShardEpochMap(epochMap); - } - - private Vgtid getVgtid(Map gtidsPerShard, String keyspace) { - List shardGtids = new ArrayList(); - for (Map.Entry entry : gtidsPerShard.entrySet()) { - shardGtids.add(new Vgtid.ShardGtid(keyspace, entry.getKey(), entry.getValue())); - } - return Vgtid.of(shardGtids); - } - - private static Map getConfigShardEpochMapPerShard(VitessConnectorConfig connectorConfig, List shards) { - String shardEpochMapString = connectorConfig.getShardEpochMap(); - Function initEpoch = x -> 0L; - Map shardEpochMap; - if (shardEpochMapString.isEmpty()) { - shardEpochMap = buildMap(shards, initEpoch); - } - else { - shardEpochMap = ShardEpochMap.of(shardEpochMapString).getMap(); - } - return shardEpochMap; - } - - private static Map getConfigGtidsPerShard(VitessConnectorConfig connectorConfig, List shards) { - String gtids = connectorConfig.getVgtid(); - Map configGtidsPerShard = null; - if (shards != null && gtids.equals(Vgtid.EMPTY_GTID)) { - Function emptyGtid = x -> Vgtid.EMPTY_GTID; - configGtidsPerShard = buildMap(shards, emptyGtid); - } - else if (shards != null && gtids.equals(Vgtid.CURRENT_GTID)) { - Function currentGtid = x -> Vgtid.CURRENT_GTID; - configGtidsPerShard = buildMap(shards, currentGtid); - } - else if (shards != null) { - List shardGtids = Vgtid.of(gtids).getShardGtids(); - Map shardsToGtid = new HashMap<>(); - for (Vgtid.ShardGtid shardGtid : shardGtids) { - shardsToGtid.put(shardGtid.getShard(), shardGtid.getGtid()); - } - Function shardGtid = (i -> shardsToGtid.get(shards.get(i))); - configGtidsPerShard = buildMap(shards, shardGtid); - } - LOGGER.info("Found GTIDs per shard in config {}", configGtidsPerShard); - return configGtidsPerShard; - } - - private static Map buildMap(List keys, Function function) { - return IntStream.range(0, keys.size()) - .boxed() - .collect(Collectors.toMap(keys::get, function)); + return valueType.conversionFunction.apply(valuesPerShard, keyspace); } @Override diff --git a/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java b/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java index c5afb34d..2aac4abc 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java +++ b/src/main/java/io/debezium/connector/vitess/VitessOffsetRetriever.java @@ -7,7 +7,6 @@ package io.debezium.connector.vitess; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; @@ -16,8 +15,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; -import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; /** @@ -50,43 +47,16 @@ public void setExpectsOffset(boolean expectsOffset) { this.expectsOffset = expectsOffset; } - public enum ValueType { - GTID(SourceInfo.VGTID_KEY, ValueType::parseGtid), - EPOCH(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, ValueType::parseEpoch); - - private final String typeName; - private final Function> parserFunction; - - ValueType(String typeName, Function> parserFunction) { - this.typeName = typeName; - this.parserFunction = parserFunction; - } - - private static Map parseGtid(String vgtidStr) { - Map shardToGtid = new HashMap<>(); - List shardGtids = Vgtid.of(vgtidStr).getShardGtids(); - for (Vgtid.ShardGtid shardGtid : shardGtids) { - shardToGtid.put(shardGtid.getShard(), shardGtid.getGtid()); - } - return shardToGtid; - } - - private static Map parseEpoch(String epochString) { - ShardEpochMap shardToEpoch = ShardEpochMap.of(epochString); - return (Map) shardToEpoch.getMap(); - } - } - public Map getGtidPerShard() { - return (Map) getValuePerShardFromStorage(ValueType.GTID); + return (Map) getValuePerShardFromStorage(OffsetValueType.GTID); } public Map getEpochPerShard() { - return (Map) getValuePerShardFromStorage(ValueType.EPOCH); + return (Map) getValuePerShardFromStorage(OffsetValueType.EPOCH); } - public Map getValuePerShardFromStorage(ValueType valueType) { - String key = valueType.typeName; + public Map getValuePerShardFromStorage(OffsetValueType valueType) { + String key = valueType.name; Function> valueReader = valueType.parserFunction; return getValuePerShardFromStorage( key,