diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 9ecfa405..0b506b05 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -70,6 +70,7 @@ * DorisStreamLoad **/ public class DorisStreamLoad implements Serializable { + private static final String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -88,6 +89,7 @@ public class DorisStreamLoad implements Serializable { private final String columns; private final String maxFilterRatio; private final Map streamLoadProp; + private boolean addDoubleQuotes; private static final long cacheExpireTimeout = 4 * 60; private final LoadingCache> cache; private final String fileType; @@ -111,6 +113,11 @@ public DorisStreamLoad(SparkSettings settings) { fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)) { FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); + this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false")); + if (addDoubleQuotes) { + LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop."); + streamLoadProp.put("trim_double_quotes", "true"); + } } else if ("json".equalsIgnoreCase(fileType)) { streamLoadProp.put("read_json_by_line", "true"); } @@ -189,7 +196,8 @@ public int load(Iterator rows, StructType schema) .format(fileType) .sep(FIELD_DELIMITER) .delim(LINE_DELIMITER) - .schema(schema).build(), streamingPassthrough); + .schema(schema) + .addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java index 779c057d..4ce297f2 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java @@ -61,14 +61,17 @@ public class RecordBatch { */ private final StructType schema; + private final boolean addDoubleQuotes; + private RecordBatch(Iterator iterator, int batchSize, String format, String sep, byte[] delim, - StructType schema) { + StructType schema, boolean addDoubleQuotes) { this.iterator = iterator; this.batchSize = batchSize; this.format = format; this.sep = sep; this.delim = delim; this.schema = schema; + this.addDoubleQuotes = addDoubleQuotes; } public Iterator getIterator() { @@ -94,6 +97,10 @@ public byte[] getDelim() { public StructType getSchema() { return schema; } + + public boolean getAddDoubleQuotes(){ + return addDoubleQuotes; + } public static Builder newBuilder(Iterator iterator) { return new Builder(iterator); } @@ -115,6 +122,8 @@ public static class Builder { private StructType schema; + private boolean addDoubleQuotes; + public Builder(Iterator iterator) { this.iterator = iterator; } @@ -144,8 +153,13 @@ public Builder schema(StructType schema) { return this; } + public Builder addDoubleQuotes(boolean addDoubleQuotes) { + this.addDoubleQuotes = addDoubleQuotes; + return this; + } + public RecordBatch build() { - return new RecordBatch(iterator, batchSize, format, sep, delim, schema); + return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes); } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index 9444c1da..d7055011 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -200,7 +200,11 @@ private byte[] rowToByte(InternalRow row) throws DorisException { switch (recordBatch.getFormat().toLowerCase()) { case "csv": - bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); + if (recordBatch.getAddDoubleQuotes()) { + bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); + } else { + bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep()); + } break; case "json": try { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index aea6ddee..3f53d459 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -51,6 +51,22 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se return builder.toString().getBytes(StandardCharsets.UTF_8); } + public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) { + StringBuilder builder = new StringBuilder(); + StructField[] fields = schema.fields(); + int n = row.numFields(); + if (n > 0) { + builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\""); + int i = 1; + while (i < n) { + builder.append(sep); + builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\""); + i++; + } + } + return builder.toString().getBytes(StandardCharsets.UTF_8); + } + public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { StructField[] fields = schema.fields();