Skip to content

Commit

Permalink
[Improve] add thrift max message size options (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Oct 24, 2024
1 parent f720167 commit 4557e8f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT;

/**
* Client to request Doris BE
*/
Expand All @@ -58,6 +61,7 @@ public class BackendClient {
private final int retries;
private final int socketTimeout;
private final int connectTimeout;
private final int thriftMaxMessageSize;

public BackendClient(Routing routing, Settings settings) throws ConnectedFailedException {
this.routing = routing;
Expand All @@ -67,8 +71,9 @@ public BackendClient(Routing routing, Settings settings) throws ConnectedFailedE
ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
this.retries = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
this.connectTimeout, this.socketTimeout, this.retries);
this.thriftMaxMessageSize = settings.getIntegerProperty(DORIS_THRIFT_MAX_MESSAGE_SIZE, DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'. thrift MAX_MESSAGE_SIZE set to '{}'",
this.connectTimeout, this.socketTimeout, this.retries, this.thriftMaxMessageSize);
open();
}

Expand All @@ -79,7 +84,9 @@ private void open() throws ConnectedFailedException {
logger.debug("Attempt {} to connect {}.", attempt, routing);
try {
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
TConfiguration.Builder configBuilder = TConfiguration.custom();
configBuilder.setMaxMessageSize(thriftMaxMessageSize);
transport = new TSocket(configBuilder.build(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
TProtocol protocol = factory.getProtocol(transport);
client = new TDorisExternalService.Client(protocol);
logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,7 @@ public interface ConfigurationOptions {

String DORIS_ARROW_FLIGHT_SQL_PORT = "doris.arrow-flight-sql.port";

String DORIS_THRIFT_MAX_MESSAGE_SIZE = "doris.thrift.max.message.size";
int DORIS_THRIFT_MAX_MESSAGE_SIZE_DEFAULT = Integer.MAX_VALUE;

}

0 comments on commit 4557e8f

Please sign in to comment.