Skip to content

Commit

Permalink
solve conflicting
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeCooker17 committed Sep 7, 2023
1 parent 1feb7fb commit df64983
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.spark.util.ResponseUtil;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
Expand All @@ -49,7 +50,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -89,7 +89,6 @@ public class DorisStreamLoad implements Serializable {
private String columns;
private String maxFilterRatio;
private Map<String, String> streamLoadProp;
private boolean trimDoubleQuotes;
private boolean addDoubleQuotes;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
Expand All @@ -113,8 +112,11 @@ public DorisStreamLoad(SparkSettings settings) {
fileType = streamLoadProp.getOrDefault("format", "csv");
if ("csv".equals(fileType)) {
FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
this.trimDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("trim_double_quotes", "false"));
this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
if (addDoubleQuotes) {
LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.");
streamLoadProp.put("trim_double_quotes", "true");
}
} else if ("json".equalsIgnoreCase(fileType)) {
readJsonByLine = Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false"));
boolean stripOuterArray = Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false"));
Expand Down Expand Up @@ -420,13 +422,23 @@ private List<String> parseLoadData(List<List<Object>> rows, String[] dfColumns)
switch (fileType.toUpperCase()) {

case "CSV":
loadDataList = Collections.singletonList(
rows.stream()
.map(row -> row.stream()
.map(DataUtil::handleColumnValue)
.map(Object::toString)
.collect(Collectors.joining(FIELD_DELIMITER))
).collect(Collectors.joining(LINE_DELIMITER)));
if (addDoubleQuotes) {
loadDataList = Collections.singletonList(
rows.stream()
.map(row -> row.stream()
.map(DataUtil::handleColumnValueAddQuotes)
.map(Object::toString)
.collect(Collectors.joining(FIELD_DELIMITER))
).collect(Collectors.joining(LINE_DELIMITER)));
} else {
loadDataList = Collections.singletonList(
rows.stream()
.map(row -> row.stream()
.map(DataUtil::handleColumnValue)
.map(Object::toString)
.collect(Collectors.joining(FIELD_DELIMITER))
).collect(Collectors.joining(LINE_DELIMITER)));
}
break;
case "JSON":
List<Map<Object, Object>> dataList = new ArrayList<>();
Expand All @@ -441,7 +453,8 @@ private List<String> parseLoadData(List<List<Object>> rows, String[] dfColumns)
dataList.add(dataMap);
}
} catch (Exception e) {
throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
throw new StreamLoadException(
"The number of configured columns does not match the number of data columns.");
}
// splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception
loadDataList = ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,23 @@ public static Object handleColumnValue(Object value) {

}

public static Object handleColumnValueAddQuotes(Object value) {

if (value == null) {
return NULL_VALUE;
}

if (value instanceof Timestamp) {
return value.toString();
}

if (value instanceof WrappedArray) {

Object[] arr = JavaConversions.seqAsJavaList((WrappedArray) value).toArray();
return Arrays.toString(arr);
}

return "\"" + value + "\"";
}

}

0 comments on commit df64983

Please sign in to comment.