diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 3d5bf362..5424b8af 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -69,6 +69,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.stream.Collectors; /** @@ -162,12 +163,17 @@ protected boolean isRedirectable(String method) { return httpClientBuilder.build(); } - private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC) { + private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, StructType schema) { HttpPut httpPut = new HttpPut(loadUrlStr); addCommonHeader(httpPut); httpPut.setHeader("label", label); if (StringUtils.isNotBlank(columns)) { httpPut.setHeader("columns", columns); + } else { + if (schema != null && !schema.isEmpty()) { + String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(",")); + httpPut.setHeader("columns", dfColumns); + } } if (StringUtils.isNotBlank(maxFilterRatio)) { httpPut.setHeader("max_filter_ratio", maxFilterRatio); @@ -210,7 +216,7 @@ public long load(Iterator rows, StructType schema) try (CloseableHttpClient httpClient = getHttpClient()) { String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); this.loadUrlStr = loadUrlStr; - HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); + HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema); RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) .format(fileType) .sep(FIELD_DELIMITER)