From ee94f552e8f56900d375cac30804ac6e94f90bae Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 20 Nov 2024 09:58:55 +0800 Subject: [PATCH] [fix] Fixed the issue that batchwriter may be blocked when writing to multiple tables (#511) --- .../sink/batch/DorisBatchStreamLoad.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 3cfda6041..3747257d1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -178,7 +178,7 @@ public DorisBatchStreamLoad( * @param record * @throws IOException */ - public synchronized void writeRecord(String database, String table, byte[] record) { + public void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); @@ -228,15 +228,15 @@ public synchronized void writeRecord(String database, String table, byte[] recor } } - public synchronized boolean bufferFullFlush(String bufferKey) { + public boolean bufferFullFlush(String bufferKey) { return doFlush(bufferKey, false, true); } - public synchronized boolean intervalFlush() { + public boolean intervalFlush() { return doFlush(null, false, false); } - public synchronized boolean checkpointFlush() { + public boolean checkpointFlush() { return doFlush(null, true, false); } @@ -254,6 +254,10 @@ private synchronized boolean doFlush( } private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { + if (bufferMap.isEmpty()) { + // bufferMap may have been flushed by other threads + return false; + } if (null == bufferKey) { boolean flush = false; for (String key : bufferMap.keySet()) { @@ -270,7 +274,7 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { } else if (bufferMap.containsKey(bufferKey)) { flushBuffer(bufferKey); } else { - throw new DorisBatchLoadException("buffer not found for key: " + bufferKey); + LOG.warn("buffer not found for key: {}, may be already flushed.", bufferKey); } if (waitUtilDone) { waitAsyncLoadFinish(); @@ -281,6 +285,7 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { private synchronized void flushBuffer(String bufferKey) { BatchRecordBuffer buffer = bufferMap.get(bufferKey); buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable())); + LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName()); putRecordToFlushQueue(buffer); bufferMap.remove(bufferKey); } @@ -408,11 +413,6 @@ public void run() { load(bf.getLabelName(), bf); } } - - if (flushQueue.size() < flushQueueSize) { - // Avoid waiting for 2 rounds of intervalMs - doFlush(null, false, false); - } } catch (Exception e) { LOG.error("worker running error", e); exception.set(e);