Skip to content

Commit

Permalink
[feature] add stream load config to add double quotes for field when …
Browse files Browse the repository at this point in the history
…csv format.
  • Loading branch information
CodeCooker17 committed Sep 13, 2023
1 parent 0daf6c4 commit 5448717
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
private static final String NULL_VALUE = "\\N";

private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);

Expand All @@ -88,6 +89,7 @@ public class DorisStreamLoad implements Serializable {
private final String columns;
private final String maxFilterRatio;
private final Map<String, String> streamLoadProp;
private boolean addDoubleQuotes;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fileType;
Expand All @@ -111,6 +113,11 @@ public DorisStreamLoad(SparkSettings settings) {
fileType = streamLoadProp.getOrDefault("format", "csv");
if ("csv".equals(fileType)) {
FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
if (addDoubleQuotes) {
LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.");
streamLoadProp.put("trim_double_quotes", "true");
}
} else if ("json".equalsIgnoreCase(fileType)) {
streamLoadProp.put("read_json_by_line", "true");
}
Expand Down Expand Up @@ -189,7 +196,8 @@ public int load(Iterator<InternalRow> rows, StructType schema)
.format(fileType)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema).build(), streamingPassthrough);
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ public class RecordBatch {
*/
private final StructType schema;

private final boolean addDoubleQuotes;

private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
StructType schema) {
StructType schema, boolean addDoubleQuotes) {
this.iterator = iterator;
this.batchSize = batchSize;
this.format = format;
this.sep = sep;
this.delim = delim;
this.schema = schema;
this.addDoubleQuotes = addDoubleQuotes;
}

public Iterator<InternalRow> getIterator() {
Expand All @@ -94,6 +97,10 @@ public byte[] getDelim() {
public StructType getSchema() {
return schema;
}

public boolean getAddDoubleQuotes(){
return addDoubleQuotes;
}
public static Builder newBuilder(Iterator<InternalRow> iterator) {
return new Builder(iterator);
}
Expand All @@ -115,6 +122,8 @@ public static class Builder {

private StructType schema;

private boolean addDoubleQuotes;

public Builder(Iterator<InternalRow> iterator) {
this.iterator = iterator;
}
Expand Down Expand Up @@ -144,8 +153,13 @@ public Builder schema(StructType schema) {
return this;
}

public Builder addDoubleQuotes(boolean addDoubleQuotes) {
this.addDoubleQuotes = addDoubleQuotes;
return this;
}

public RecordBatch build() {
return new RecordBatch(iterator, batchSize, format, sep, delim, schema);
return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ private byte[] rowToByte(InternalRow row) throws DorisException {

switch (recordBatch.getFormat().toLowerCase()) {
case "csv":
bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
if (recordBatch.getAddDoubleQuotes()) {
bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
} else {
bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
}
break;
case "json":
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se
return builder.toString().getBytes(StandardCharsets.UTF_8);
}

public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) {
StringBuilder builder = new StringBuilder();
StructField[] fields = schema.fields();
int n = row.numFields();
if (n > 0) {
builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\"");
int i = 1;
while (i < n) {
builder.append(sep);
builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\"");
i++;
}
}
return builder.toString().getBytes(StandardCharsets.UTF_8);
}

public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
throws JsonProcessingException {
StructField[] fields = schema.fields();
Expand Down

0 comments on commit 5448717

Please sign in to comment.