Skip to content

Commit

Permalink
[Fix] When http reports an error, writing will get stuck (#539)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Jan 10, 2025
1 parent ed4a4c3 commit 1fcc640
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected boolean isRedirectable(String method) {
return true;
}
})
.setRetryHandler((exception, executionCount, context) -> false)
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
.setDefaultRequestConfig(
RequestConfig.custom()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DorisStreamLoad implements Serializable {
private final Properties streamLoadProp;
private final RecordStream recordStream;
private volatile Future<CloseableHttpResponse> pendingLoadFuture;
private volatile Exception httpException = null;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
Expand All @@ -115,14 +116,18 @@ public DorisStreamLoad(
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.enableDelete = executionOptions.getDeletable();
this.httpClient = httpClient;
String threadName =
String.format(
"stream-load-upload-%s-%s",
labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier());
this.executorService =
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ExecutorThreadFactory("stream-load-upload"));
new ExecutorThreadFactory(threadName));
this.recordStream =
new RecordStream(
executionOptions.getBufferSize(),
Expand Down Expand Up @@ -250,6 +255,7 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception {
* @throws IOException
*/
public void writeRecord(byte[] record) throws IOException {
checkLoadException();
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else if (lineDelimiter != null) {
Expand All @@ -258,6 +264,12 @@ public void writeRecord(byte[] record) throws IOException {
recordStream.write(record);
}

private void checkLoadException() {
if (httpException != null) {
throw new RuntimeException("Stream load http request error, ", httpException);
}
}

@VisibleForTesting
public RecordStream getRecordStream() {
return recordStream;
Expand Down Expand Up @@ -347,11 +359,21 @@ public void startLoad(String label, boolean isResume) throws IOException {
} else {
executeMessage = "table " + table + " start execute load for label " + label;
}
Thread mainThread = Thread.currentThread();
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info(executeMessage);
return httpClient.execute(putBuilder.build());
try {
return httpClient.execute(putBuilder.build());
} catch (Exception e) {
LOG.error("Failed to execute load, cause ", e);
httpException = e;
// When an HTTP error occurs, the main thread should be
// interrupted to prevent blocking
mainThread.interrupt();
throw e;
}
});
} catch (Exception e) {
String err;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,12 @@ public String getConcatLabelPrefix() {
String concatPrefix = String.format("%s_%s_%s", labelPrefix, tableIdentifier, subtaskId);
return concatPrefix;
}

public int getSubtaskId() {
return subtaskId;
}

public String getTableIdentifier() {
return tableIdentifier;
}
}

0 comments on commit 1fcc640

Please sign in to comment.