Skip to content

Commit

Permalink
[Fix](cdc) Fix sql_parse schema table annotation and field type parsi…
Browse files Browse the repository at this point in the history
…ng inaccuracies (#540)
  • Loading branch information
xuqinghuang authored Jan 10, 2025
1 parent 1fcc640 commit 1a92f2a
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class SQLParserSchemaManager implements Serializable {
private static final String PRIMARY_KEY = "PRIMARY KEY";
private static final String UNIQUE = "UNIQUE";
private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
private static final List<String> TYPE_MODIFIER =
Arrays.asList("UNSIGNED", "ZEROFILL", "PRECISION");
private static final Set<String> sourceConnectorTimeValues =
new HashSet<>(
Arrays.asList(
Expand Down Expand Up @@ -139,15 +141,13 @@ public TableSchema parseCreateTableStatement(
.forEach(
column -> {
String columnName = column.getColumnName();
ColDataType colDataType = column.getColDataType();
String dataType = parseDataType(colDataType, sourceConnector);
List<String> columnSpecs = column.getColumnSpecs();
String defaultValue =
extractDefaultValue(dataType, columnSpecs);
String comment = extractComment(columnSpecs);
FieldSchema fieldSchema =
new FieldSchema(
columnName, dataType, defaultValue, comment);
getFieldSchema(
column.getColumnName(),
column.getColumnSpecs(),
column.getColDataType(),
sourceConnector);
columnFields.put(columnName, fieldSchema);
extractColumnPrimaryKey(columnName, columnSpecs, pkKeys);
});
Expand Down Expand Up @@ -181,6 +181,20 @@ public TableSchema parseCreateTableStatement(
return null;
}

private String extractTypeModifier(List<String> columnSpecs) {
if (CollectionUtils.isEmpty(columnSpecs)) {
return "";
}
StringBuilder builder = new StringBuilder();
for (String columnSpec : columnSpecs) {
String columnSpecUpperCase = columnSpec.toUpperCase(Locale.ROOT);
if (TYPE_MODIFIER.contains(columnSpecUpperCase)) {
builder.append(" ").append(columnSpecUpperCase);
}
}
return builder.toString();
}

private void extractIndexesPrimaryKey(List<Index> indexes, List<String> pkKeys) {
if (CollectionUtils.isEmpty(indexes)) {
return;
Expand Down Expand Up @@ -215,10 +229,23 @@ private String extractTableComment(List<String> tableOptionsStrings) {
if (CollectionUtils.isEmpty(tableOptionsStrings)) {
return null;
}

for (int i = 0; i < tableOptionsStrings.size(); i++) {
String columnSpec = tableOptionsStrings.get(i);
// If you encounter a COMMENT and the next element is an equal sign (=)
if (COMMENT.equalsIgnoreCase(columnSpec)
&& i + 1 < tableOptionsStrings.size()
&& "=".equals(tableOptionsStrings.get(i + 1))) {
tableOptionsStrings.remove(i + 1);
break;
}
}

return extractAdjacentString(tableOptionsStrings, COMMENT);
}

private String parseDataType(ColDataType colDataType, SourceConnector sourceConnector) {
private String parseDataType(
ColDataType colDataType, String typeModifier, SourceConnector sourceConnector) {
String dataType = colDataType.getDataType();
int length = 0;
int scale = 0;
Expand All @@ -229,7 +256,8 @@ private String parseDataType(ColDataType colDataType, SourceConnector sourceConn
scale = Integer.parseInt(argumentsStringList.get(1));
}
}
return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector, dataType, length, scale);
return JsonDebeziumChangeUtils.buildDorisTypeName(
sourceConnector, dataType + typeModifier, length, scale);
}

private String processDropColumnOperation(AlterExpression alterExpression, String dorisTable) {
Expand All @@ -244,21 +272,32 @@ private List<String> processAddColumnOperation(
List<ColumnDataType> colDataTypeList = alterExpression.getColDataTypeList();
List<String> addColumnList = new ArrayList<>();
for (ColumnDataType columnDataType : colDataTypeList) {
String columnName = columnDataType.getColumnName();
ColDataType colDataType = columnDataType.getColDataType();
String datatype = parseDataType(colDataType, sourceConnector);

List<String> columnSpecs = columnDataType.getColumnSpecs();
String defaultValue = extractDefaultValue(datatype, columnSpecs);
String comment = extractComment(columnSpecs);
FieldSchema fieldSchema = new FieldSchema(columnName, datatype, defaultValue, comment);
FieldSchema fieldSchema =
getFieldSchema(
columnDataType.getColumnName(),
columnDataType.getColumnSpecs(),
columnDataType.getColDataType(),
sourceConnector);
String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL);
addColumnList.add(addColumnDDL);
}
return addColumnList;
}

private FieldSchema getFieldSchema(
String columnName,
List<String> columnSpecs,
ColDataType colDataType,
SourceConnector sourceConnector) {
String typeModifier = extractTypeModifier(columnSpecs);
String datatype = parseDataType(colDataType, typeModifier, sourceConnector);

String defaultValue = extractDefaultValue(datatype, columnSpecs);
String comment = extractComment(columnSpecs);
return new FieldSchema(columnName, datatype, defaultValue, comment);
}

private String processChangeColumnOperation(
AlterExpression alterExpression, String dorisTable) {
String columnNewName = alterExpression.getColDataTypeList().get(0).getColumnName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ public void testParserAlterDDLs() {
"ALTER TABLE `doris`.`tab` ADD COLUMN `create_time` DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'");
expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`");
expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`");
expectDDLs.add(
"ALTER TABLE `doris`.`tab` ADD COLUMN `card` LARGEINT COMMENT 'card_comment'");

SourceConnector mysql = SourceConnector.MYSQL;
String ddl =
"alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10)";
"alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10), add card bigint(20) unsigned NOT NULL COMMENT 'card_comment'";
List<String> actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl, dorisTable);
for (String actualDDL : actualDDLs) {
Assert.assertTrue(expectDDLs.contains(actualDDL));
Expand Down Expand Up @@ -257,13 +259,38 @@ public void testParseCreateTableStatement() {
+ " `decimal_type3` decimal(38,9) DEFAULT '1.123456789' COMMENT 'comment_test',\n"
+ " `create_time3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+ " PRIMARY KEY (`id`)\n"
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci";
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci comment='test_sinka'";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
SourceConnector.MYSQL, ddl, dorisTable, null);

String expected =
"TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}";
Assert.assertEquals(expected, tableSchema.toString());
}

@Test
public void testParseCreateTableUnsignedStatement() {
String dorisTable = "doris.auto_tab";
String ddl =
"CREATE TABLE `test_sinka` (\n"
+ " `id` BIGINT NOT NULL DEFAULT '10000' COMMENT 'id_test',\n"
+ " `id2` BIGINT UNSIGNED ZEROFILL NOT NULL DEFAULT '10000' COMMENT 'id2_comment',\n"
+ " `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3),\n"
+ " `c1` int DEFAULT '999',\n"
+ " `decimal_type` decimal(9,3) DEFAULT '1.000' COMMENT 'decimal_tes',\n"
+ " `aaa` varchar(100) DEFAULT NULL,\n"
+ " `decimal_type3` decimal(38,9) DEFAULT '1.123456789' COMMENT 'comment_test',\n"
+ " `create_time3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+ " PRIMARY KEY (`id`)\n"
+ ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci comment 'test_sinka'";
TableSchema tableSchema =
schemaManager.parseCreateTableStatement(
SourceConnector.MYSQL, ddl, dorisTable, null);

String expected =
"TableSchema{database='doris', table='auto_tab', tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}";
"TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='BIGINT', defaultValue='10000', comment='id_test'}, `id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.000', comment='decimal_tes'}, `aaa`=FieldSchema{name='`aaa`', typeString='VARCHAR(300)', defaultValue='NULL', comment='null'}, `decimal_type3`=FieldSchema{name='`decimal_type3`', typeString='DECIMALV3(38,9)', defaultValue='1.123456789', comment='comment_test'}, `create_time3`=FieldSchema{name='`create_time3`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='ttime_aaa'}}, keys=`id`, model=UNIQUE, distributeKeys=`id`, properties={}, tableBuckets=null}";
System.out.println(tableSchema.toString());
Assert.assertEquals(expected, tableSchema.toString());
}

Expand Down

0 comments on commit 1a92f2a

Please sign in to comment.