Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] add stream load config to add double quotes for field when csv format. #119

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
private static final String NULL_VALUE = "\\N";

private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);

Expand All @@ -88,6 +89,7 @@ public class DorisStreamLoad implements Serializable {
private final String columns;
private final String maxFilterRatio;
private final Map<String, String> streamLoadProp;
private boolean addDoubleQuotes;
private static final long cacheExpireTimeout = 4 * 60;
private final LoadingCache<String, List<BackendV2.BackendRowV2>> cache;
private final String fileType;
Expand All @@ -111,6 +113,11 @@ public DorisStreamLoad(SparkSettings settings) {
fileType = streamLoadProp.getOrDefault("format", "csv");
if ("csv".equals(fileType)) {
FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false"));
if (addDoubleQuotes) {
LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop.");
streamLoadProp.put("trim_double_quotes", "true");
}
} else if ("json".equalsIgnoreCase(fileType)) {
streamLoadProp.put("read_json_by_line", "true");
}
Expand Down Expand Up @@ -189,7 +196,8 @@ public int load(Iterator<InternalRow> rows, StructType schema)
.format(fileType)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema).build(), streamingPassthrough);
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream));
HttpResponse httpResponse = httpClient.execute(httpPut);
loadResponse = new LoadResponse(httpResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,17 @@ public class RecordBatch {
*/
private final StructType schema;

private final boolean addDoubleQuotes;

private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
StructType schema) {
StructType schema, boolean addDoubleQuotes) {
this.iterator = iterator;
this.batchSize = batchSize;
this.format = format;
this.sep = sep;
this.delim = delim;
this.schema = schema;
this.addDoubleQuotes = addDoubleQuotes;
}

public Iterator<InternalRow> getIterator() {
Expand All @@ -94,6 +97,10 @@ public byte[] getDelim() {
public StructType getSchema() {
return schema;
}

public boolean getAddDoubleQuotes(){
return addDoubleQuotes;
}
public static Builder newBuilder(Iterator<InternalRow> iterator) {
return new Builder(iterator);
}
Expand All @@ -115,6 +122,8 @@ public static class Builder {

private StructType schema;

private boolean addDoubleQuotes;

public Builder(Iterator<InternalRow> iterator) {
this.iterator = iterator;
}
Expand Down Expand Up @@ -144,8 +153,13 @@ public Builder schema(StructType schema) {
return this;
}

public Builder addDoubleQuotes(boolean addDoubleQuotes) {
this.addDoubleQuotes = addDoubleQuotes;
return this;
}

public RecordBatch build() {
return new RecordBatch(iterator, batchSize, format, sep, delim, schema);
return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ private byte[] rowToByte(InternalRow row) throws DorisException {

switch (recordBatch.getFormat().toLowerCase()) {
case "csv":
bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
if (recordBatch.getAddDoubleQuotes()) {
bytes = DataUtil.rowAddDoubleQuotesToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
} else {
bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
}
break;
case "json":
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ public static byte[] rowToCsvBytes(InternalRow row, StructType schema, String se
return builder.toString().getBytes(StandardCharsets.UTF_8);
}

public static byte[] rowAddDoubleQuotesToCsvBytes(InternalRow row, StructType schema, String sep) {
StringBuilder builder = new StringBuilder();
StructField[] fields = schema.fields();
int n = row.numFields();
if (n > 0) {
builder.append("\"").append(SchemaUtils.rowColumnValue(row, 0, fields[0].dataType())).append("\"");
int i = 1;
while (i < n) {
builder.append(sep);
builder.append("\"").append(SchemaUtils.rowColumnValue(row, i, fields[i].dataType())).append("\"");
i++;
}
}
return builder.toString().getBytes(StandardCharsets.UTF_8);
}

public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
throws JsonProcessingException {
StructField[] fields = schema.fields();
Expand Down