From 7421fe4d2574a4b13f64f5f995b6507e27ba6808 Mon Sep 17 00:00:00 2001 From: huanccwang Date: Thu, 7 Sep 2023 23:31:55 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=BD=93=E6=B2=A1=E6=9C=89=E4=BC=A0columns?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E9=BB=98=E8=AE=A4=E4=BB=A5df=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../doris/spark/load/DorisStreamLoad.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index ac920cd0..bea0caf8 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -16,6 +16,14 @@ // under the License. package org.apache.doris.spark.load; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.SparkSettings; import org.apache.doris.spark.exception.StreamLoadException; @@ -25,15 +33,6 @@ import org.apache.doris.spark.util.DataUtil; import org.apache.doris.spark.util.ListUtils; import org.apache.doris.spark.util.ResponseUtil; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; @@ -50,7 +49,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -184,7 +182,7 @@ public List loadV2(List> rows, String[] dfColumns, Boolean try { for (String data : loadData) { - txnIds.add(load(data, enable2PC)); + txnIds.add(load(data, enable2PC, dfColumns)); } } catch (StreamLoadException e) { if (enable2PC && !txnIds.isEmpty()) { @@ -216,7 +214,7 @@ public List loadStream(List> rows, String[] dfColumns, Boo try { for (String data : loadData) { - txnIds.add(load(data, enable2PC)); + txnIds.add(load(data, enable2PC, dfColumns)); } } catch (StreamLoadException e) { if (enable2PC && !txnIds.isEmpty()) { @@ -232,7 +230,7 @@ public List loadStream(List> rows, String[] dfColumns, Boo } - public int load(String value, Boolean enable2PC) throws StreamLoadException { + public int load(String value, Boolean enable2PC, String[] dfColumns) throws StreamLoadException { String label = generateLoadLabel(); @@ -246,6 +244,10 @@ public int load(String value, Boolean enable2PC) throws StreamLoadException { HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8)); + if (StringUtils.isBlank(columns)) { + String dfcColumnsWithComma = Arrays.stream(dfColumns).collect(Collectors.joining(",")); + httpPut.setHeader("columns", dfcColumnsWithComma); + } HttpResponse httpResponse = httpClient.execute(httpPut); responseHttpStatus = httpResponse.getStatusLine().getStatusCode(); String respMsg = httpResponse.getStatusLine().getReasonPhrase(); From dc53a5013bb9789fd65fb4dc4022c5ecf65856a6 Mon Sep 17 00:00:00 2001 From: huanccwang Date: Thu, 7 Sep 2023 23:39:07 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=A4=E7=A9=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index bea0caf8..0e453e96 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -23,6 +23,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.SparkSettings; @@ -244,7 +245,7 @@ public int load(String value, Boolean enable2PC, String[] dfColumns) throws Stre HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); httpPut.setEntity(new StringEntity(value, StandardCharsets.UTF_8)); - if (StringUtils.isBlank(columns)) { + if (StringUtils.isBlank(columns)&& ArrayUtils.isNotEmpty(dfColumns)) { String dfcColumnsWithComma = Arrays.stream(dfColumns).collect(Collectors.joining(",")); httpPut.setHeader("columns", dfcColumnsWithComma); } From c6547fbb6a29c5dc2087a12cc9302c80cf50d280 Mon Sep 17 00:00:00 2001 From: huanccwang Date: Wed, 18 Oct 2023 22:01:59 +0800 Subject: [PATCH 3/4] resolve conflict --- .../doris/spark/load/DorisStreamLoad.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index c524a4c6..71f5e31d 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -16,14 +16,6 @@ // under the License. package org.apache.doris.spark.load; -import org.apache.doris.spark.cfg.ConfigurationOptions; -import org.apache.doris.spark.cfg.SparkSettings; -import org.apache.doris.spark.exception.StreamLoadException; -import org.apache.doris.spark.rest.RestService; -import org.apache.doris.spark.rest.models.BackendV2; -import org.apache.doris.spark.rest.models.RespContent; -import org.apache.doris.spark.util.ResponseUtil; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -31,7 +23,15 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.doris.spark.cfg.ConfigurationOptions; +import org.apache.doris.spark.cfg.SparkSettings; +import org.apache.doris.spark.exception.StreamLoadException; +import org.apache.doris.spark.rest.RestService; +import org.apache.doris.spark.rest.models.BackendV2; +import org.apache.doris.spark.rest.models.RespContent; +import org.apache.doris.spark.util.ResponseUtil; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; @@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.stream.Collectors; /** @@ -150,12 +151,17 @@ private CloseableHttpClient getHttpClient() { return httpClientBuilder.build(); } - private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC) { + private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, StructType schema) { HttpPut httpPut = new HttpPut(loadUrlStr); addCommonHeader(httpPut); httpPut.setHeader("label", label); if (StringUtils.isNotBlank(columns)) { httpPut.setHeader("columns", columns); + } else { + if (ObjectUtils.isNotEmpty(schema)) { + String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(",")); + httpPut.setHeader("columns", dfColumns); + } } if (StringUtils.isNotBlank(maxFilterRatio)) { httpPut.setHeader("max_filter_ratio", maxFilterRatio); @@ -198,7 +204,7 @@ public int load(Iterator rows, StructType schema) try (CloseableHttpClient httpClient = getHttpClient()) { String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); this.loadUrlStr = loadUrlStr; - HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); + HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema); RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) .batchSize(batchSize) .format(fileType) @@ -206,6 +212,7 @@ public int load(Iterator rows, StructType schema) .delim(LINE_DELIMITER) .schema(schema) .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); + Arrays.stream(schema.fieldNames()).collect(Collectors.joining(",")); httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); From 3ce2d995228c1add482aafc7cd6c4303168adb0a Mon Sep 17 00:00:00 2001 From: huanccwang Date: Wed, 25 Oct 2023 22:55:21 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=88=A4=E7=A9=BA?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 9edbd6db..87f1457b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -23,7 +23,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.SparkSettings; @@ -155,7 +154,7 @@ private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, S if (StringUtils.isNotBlank(columns)) { httpPut.setHeader("columns", columns); } else { - if (ObjectUtils.isNotEmpty(schema)) { + if (schema != null && !schema.isEmpty()) { String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(",")); httpPut.setHeader("columns", dfColumns); }