Skip to content

Commit

Permalink
[Improve] DataFrame CSV Stream Load optimization (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
huanccwang authored Nov 7, 2023
1 parent e118070 commit a2e682b
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -162,12 +163,17 @@ protected boolean isRedirectable(String method) {
return httpClientBuilder.build();
}

private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC) {
private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, StructType schema) {
HttpPut httpPut = new HttpPut(loadUrlStr);
addCommonHeader(httpPut);
httpPut.setHeader("label", label);
if (StringUtils.isNotBlank(columns)) {
httpPut.setHeader("columns", columns);
} else {
if (schema != null && !schema.isEmpty()) {
String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(","));
httpPut.setHeader("columns", dfColumns);
}
}
if (StringUtils.isNotBlank(maxFilterRatio)) {
httpPut.setHeader("max_filter_ratio", maxFilterRatio);
Expand Down Expand Up @@ -210,7 +216,7 @@ public long load(Iterator<InternalRow> rows, StructType schema)
try (CloseableHttpClient httpClient = getHttpClient()) {
String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl);
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC);
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.format(fileType)
.sep(FIELD_DELIMITER)
Expand Down

0 comments on commit a2e682b

Please sign in to comment.