diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 53d3ce13b..d1600de69 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -60,6 +60,7 @@ protected boolean isRedirectable(String method) { return true; } }) + .setRetryHandler((exception, executionCount, context) -> false) .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) .setDefaultRequestConfig( RequestConfig.custom() diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index f900f7418..c6e393265 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -89,6 +89,7 @@ public class DorisStreamLoad implements Serializable { private final Properties streamLoadProp; private final RecordStream recordStream; private volatile Future pendingLoadFuture; + private volatile Exception httpException = null; private final CloseableHttpClient httpClient; private final ExecutorService executorService; private boolean loadBatchFirstRecord; @@ -115,6 +116,10 @@ public DorisStreamLoad( this.streamLoadProp = executionOptions.getStreamLoadProp(); this.enableDelete = executionOptions.getDeletable(); this.httpClient = httpClient; + String threadName = + String.format( + "stream-load-upload-%s-%s", + labelGenerator.getSubtaskId(), labelGenerator.getTableIdentifier()); this.executorService = new ThreadPoolExecutor( 1, @@ -122,7 +127,7 @@ public DorisStreamLoad( 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new ExecutorThreadFactory("stream-load-upload")); + new ExecutorThreadFactory(threadName)); this.recordStream = new RecordStream( executionOptions.getBufferSize(), @@ -250,6 +255,7 @@ public void abortPreCommit(String labelPrefix, long chkID) throws Exception { * @throws IOException */ public void writeRecord(byte[] record) throws IOException { + checkLoadException(); if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { @@ -258,6 +264,12 @@ public void writeRecord(byte[] record) throws IOException { recordStream.write(record); } + private void checkLoadException() { + if (httpException != null) { + throw new RuntimeException("Stream load http request error, ", httpException); + } + } + @VisibleForTesting public RecordStream getRecordStream() { return recordStream; @@ -347,11 +359,21 @@ public void startLoad(String label, boolean isResume) throws IOException { } else { executeMessage = "table " + table + " start execute load for label " + label; } + Thread mainThread = Thread.currentThread(); pendingLoadFuture = executorService.submit( () -> { LOG.info(executeMessage); - return httpClient.execute(putBuilder.build()); + try { + return httpClient.execute(putBuilder.build()); + } catch (Exception e) { + LOG.error("Failed to execute load, cause ", e); + httpException = e; + // When an HTTP error occurs, the main thread should be + // interrupted to prevent blocking + mainThread.interrupt(); + throw e; + } }); } catch (Exception e) { String err; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index d80315f55..84c14b793 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -91,4 +91,12 @@ public String getConcatLabelPrefix() { String concatPrefix = String.format("%s_%s_%s", labelPrefix, tableIdentifier, subtaskId); return concatPrefix; } + + public int getSubtaskId() { + return subtaskId; + } + + public String getTableIdentifier() { + return tableIdentifier; + } }