Skip to content

Commit

Permalink
DBZ-8154 Refactor getVitessTaskValuePerShard
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn authored and jpechane committed Aug 21, 2024
1 parent 3190d30 commit 40a903c
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions src/main/java/io/debezium/connector/vitess/VitessConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
Expand Down Expand Up @@ -153,15 +154,17 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
public Configuration getConfigWithOffsets(Configuration config) {
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(config);
if (connectorConfig.offsetStoragePerTask()) {
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.GTID);
Object vgtid = getVitessTaskValuePerShard(connectorConfig, VitessOffsetRetriever.ValueType.GTID);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, vgtid).build();
if (VitessOffsetRetriever.isShardEpochMapEnabled(connectorConfig)) {
config = getVitessTaskValuePerShard(config, connectorConfig, VitessOffsetRetriever.ValueType.EPOCH);
Object shardEpochMap = getVitessTaskValuePerShard(connectorConfig, VitessOffsetRetriever.ValueType.EPOCH);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_SHARD_EPOCH_MAP_CONFIG, shardEpochMap).build();
}
}
return config;
}

private Configuration getVitessTaskValuePerShard(Configuration config, VitessConnectorConfig connectorConfig, VitessOffsetRetriever.ValueType valueType) {
private Object getVitessTaskValuePerShard(VitessConnectorConfig connectorConfig, VitessOffsetRetriever.ValueType valueType) {
int gen = connectorConfig.getOffsetStorageTaskKeyGen();
int prevGen = gen - 1;
VitessOffsetRetriever prevGenRetriever = new VitessOffsetRetriever(
Expand Down Expand Up @@ -213,16 +216,13 @@ else if (prevGenValuesPerShard != null && prevGenValuesPerShard.containsKey(shar
switch (valueType) {
case GTID:
Map<String, String> gtidsPerShard = (Map) valuesPerShard;
Vgtid vgtid = getVgtid(gtidsPerShard, keyspace);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_VGTID_CONFIG, vgtid).build();
break;
return getVgtid(gtidsPerShard, keyspace);
case EPOCH:
Map<String, Long> epochsPerShard = (Map) valuesPerShard;
ShardEpochMap shardEpochMap = getShardEpochMap(epochsPerShard);
config = config.edit().with(VitessConnectorConfig.VITESS_TASK_SHARD_EPOCH_MAP_CONFIG, shardEpochMap).build();
break;
return getShardEpochMap(epochsPerShard);
default:
throw new DebeziumException(String.format("Unknown value type %s", valueType.name()));
}
return config;
}

private ShardEpochMap getShardEpochMap(Map<String, Long> epochMap) {
Expand Down

0 comments on commit 40a903c

Please sign in to comment.