From 4557e8fcf64f294a9f3773d7090306166c0c3429 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 24 Oct 2024 11:51:08 +0800 Subject: [PATCH] [Improve] add thrift max message size options (#233) --- .../apache/doris/spark/backend/BackendClient.java | 13 ++++++++++--- .../doris/spark/cfg/ConfigurationOptions.java | 3 +++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java index b10797b4..04c32880 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java @@ -43,6 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT; + /** * Client to request Doris BE */ @@ -58,6 +61,7 @@ public class BackendClient { private final int retries; private final int socketTimeout; private final int connectTimeout; + private final int thriftMaxMessageSize; public BackendClient(Routing routing, Settings settings) throws ConnectedFailedException { this.routing = routing; @@ -67,8 +71,9 @@ public BackendClient(Routing routing, Settings settings) throws ConnectedFailedE ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT); this.retries = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT); - logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", - this.connectTimeout, this.socketTimeout, this.retries); + this.thriftMaxMessageSize = settings.getIntegerProperty(DORIS_THRIFT_MAX_MESSAGE_SIZE, DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT); + logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'", + this.connectTimeout, this.socketTimeout, this.retries, this.thriftMaxMessageSize); open(); } @@ -79,7 +84,9 @@ private void open() throws ConnectedFailedException { logger.debug("Attempt {} to connect {}.", attempt, routing); try { TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); - transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); + TConfiguration.Builder configBuilder = TConfiguration.custom(); + configBuilder.setMaxMessageSize(thriftMaxMessageSize); + transport = new TSocket(configBuilder.build(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); TProtocol protocol = factory.getProtocol(transport); client = new TDorisExternalService.Client(protocol); logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 3b9b5540..cf0630f7 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -167,4 +167,7 @@ public interface ConfigurationOptions { String DORIS_ARROW_FLIGHT_SQL_PORT = "doris.arrow-flight-sql.port"; + String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size"; + int DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE; + }