Skip to content

Commit

Permalink
[Improvment](cdc) Use uniq index as primary key (#516)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Nov 25, 2024
1 parent 68ff6db commit 0611b93
Showing 1 changed file with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata about jdbc-related
Expand All @@ -47,6 +49,10 @@ public JdbcSourceSchema(
super(databaseName, schemaName, tableName, tableComment);
fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName, tableName);
if (primaryKeys.isEmpty()) {
List<String> uniqIndex = getUniqIndex(metaData, databaseName, schemaName, tableName);
primaryKeys.addAll(uniqIndex);
}
}

public LinkedHashMap<String, FieldSchema> getColumnInfo(
Expand Down Expand Up @@ -96,5 +102,32 @@ public List<String> getPrimaryKeys(
return primaryKeys;
}

/**
* Get the unique index of the table If the primary key is empty but there is a uniq key, then
* use the uniqkey instead of the primarykey
*/
public List<String> getUniqIndex(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
throws SQLException {
Map<String, List<String>> uniqIndexMap = new HashMap<>();
String firstIndexName = null;
try (ResultSet rs =
metaData.getIndexInfo(databaseName, schemaName, tableName, true, true)) {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
String indexName = rs.getString("INDEX_NAME");
if (firstIndexName == null) {
firstIndexName = indexName;
}
uniqIndexMap.computeIfAbsent(indexName, k -> new ArrayList<>()).add(columnName);
}
}
if (!uniqIndexMap.isEmpty()) {
// If there are multiple uniq indices, return one
return uniqIndexMap.get(firstIndexName);
}
return new ArrayList<>();
}

public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
}

0 comments on commit 0611b93

Please sign in to comment.