From df64983eff338f609affb2623158173f1bbd1d89 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Sep 2023 17:17:35 +0800 Subject: [PATCH] solve conflicting --- .../doris/spark/load/DorisStreamLoad.java | 35 +++++++++++++------ .../org/apache/doris/spark/util/DataUtil.java | 19 ++++++++++ 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 2aaaac15..706912c7 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -27,6 +27,7 @@ import org.apache.doris.spark.util.ResponseUtil; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -49,7 +50,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -89,7 +89,6 @@ public class DorisStreamLoad implements Serializable { private String columns; private String maxFilterRatio; private Map streamLoadProp; - private boolean trimDoubleQuotes; private boolean addDoubleQuotes; private static final long cacheExpireTimeout = 4 * 60; private final LoadingCache> cache; @@ -113,8 +112,11 @@ public DorisStreamLoad(SparkSettings settings) { fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)) { FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); - this.trimDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("trim_double_quotes", "false")); this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false")); + if (addDoubleQuotes) { + LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop."); + streamLoadProp.put("trim_double_quotes", "true"); + } } else if ("json".equalsIgnoreCase(fileType)) { readJsonByLine = Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false")); boolean stripOuterArray = Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false")); @@ -420,13 +422,23 @@ private List parseLoadData(List> rows, String[] dfColumns) switch (fileType.toUpperCase()) { case "CSV": - loadDataList = Collections.singletonList( - rows.stream() - .map(row -> row.stream() - .map(DataUtil::handleColumnValue) - .map(Object::toString) - .collect(Collectors.joining(FIELD_DELIMITER)) - ).collect(Collectors.joining(LINE_DELIMITER))); + if (addDoubleQuotes) { + loadDataList = Collections.singletonList( + rows.stream() + .map(row -> row.stream() + .map(DataUtil::handleColumnValueAddQuotes) + .map(Object::toString) + .collect(Collectors.joining(FIELD_DELIMITER)) + ).collect(Collectors.joining(LINE_DELIMITER))); + } else { + loadDataList = Collections.singletonList( + rows.stream() + .map(row -> row.stream() + .map(DataUtil::handleColumnValue) + .map(Object::toString) + .collect(Collectors.joining(FIELD_DELIMITER)) + ).collect(Collectors.joining(LINE_DELIMITER))); + } break; case "JSON": List> dataList = new ArrayList<>(); @@ -441,7 +453,8 @@ private List parseLoadData(List> rows, String[] dfColumns) dataList.add(dataMap); } } catch (Exception e) { - throw new StreamLoadException("The number of configured columns does not match the number of data columns."); + throw new StreamLoadException( + "The number of configured columns does not match the number of data columns."); } // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception loadDataList = ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index 58774474..bfa67061 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -48,4 +48,23 @@ public static Object handleColumnValue(Object value) { } + public static Object handleColumnValueAddQuotes(Object value) { + + if (value == null) { + return NULL_VALUE; + } + + if (value instanceof Timestamp) { + return value.toString(); + } + + if (value instanceof WrappedArray) { + + Object[] arr = JavaConversions.seqAsJavaList((WrappedArray) value).toArray(); + return Arrays.toString(arr); + } + + return "\"" + value + "\""; + } + }