Skip to content

Commit

Permalink
Rework MSE query throttling to take into account estimated number of …
Browse files Browse the repository at this point in the history
…threads used by a query (#14847)
  • Loading branch information
yashmayya authored Jan 24, 2025
1 parent c9e08f3 commit c239952
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,11 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}

Timer queryTimer = new Timer(queryTimeoutMs);
int estimatedNumQueryThreads = dispatchableSubPlan.getEstimatedNumQueryThreads();
try {
// It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process
// these requests.
if (!_queryThrottler.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) {
if (!_queryThrottler.tryAcquire(estimatedNumQueryThreads, queryTimeoutMs, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query);
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
Expand Down Expand Up @@ -322,7 +323,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

return brokerResponse;
} finally {
_queryThrottler.release();
_queryThrottler.release(estimatedNumQueryThreads);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@

/**
* This class helps limit the number of multi-stage queries being executed concurrently. Note that the cluster
* configuration is a "per server" value and the broker currently simply assumes that a query will be across all
* servers. Another assumption here is that queries are evenly distributed across brokers.
* configuration is a "per server" value and the broker currently computes the max server query threads as
* <em>CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS * numServers / numBrokers</em>. Note that the config value,
* number of servers, and number of brokers are all dynamically updated here.
* <p>
* Another assumption made here is that queries are evenly distributed across brokers.
* <p>
* This is designed to limit the number of multi-stage queries being concurrently executed across a cluster and is not
* intended to prevent individual large queries from being executed.
*/
public class MultiStageQueryThrottler implements ClusterChangeHandler {

Expand All @@ -50,10 +56,10 @@ public class MultiStageQueryThrottler implements ClusterChangeHandler {
private int _numBrokers;
private int _numServers;
/**
* If _maxConcurrentQueries is <= 0, it means that the cluster is not configured to limit the number of multi-stage
* If _maxServerQueryThreads is <= 0, it means that the cluster is not configured to limit the number of multi-stage
* queries that can be executed concurrently. In this case, we should not block the query.
*/
private int _maxConcurrentQueries;
private int _maxServerQueryThreads;
private AdjustableSemaphore _semaphore;

@Override
Expand All @@ -63,11 +69,11 @@ public void init(HelixManager helixManager) {
_helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();

_maxConcurrentQueries = Integer.parseInt(
_maxServerQueryThreads = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));

List<String> clusterInstances = _helixAdmin.getInstancesInCluster(_helixManager.getClusterName());
_numBrokers = Math.max(1, (int) clusterInstances.stream()
Expand All @@ -77,36 +83,49 @@ public void init(HelixManager helixManager) {
.filter(instance -> instance.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))
.count());

if (_maxConcurrentQueries > 0) {
_semaphore = new AdjustableSemaphore(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers), true);
if (_maxServerQueryThreads > 0) {
_semaphore = new AdjustableSemaphore(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers), true);
}
}

/**
* Returns true if the query can be executed (waiting until it can be executed if necessary), false otherwise.
* <p>
* {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to
* ensure that {@link #release()} is called exactly once for each call to this method.
* {@link #release(int)} should be called after the query is done executing. It is the responsibility of the caller to
* ensure that {@link #release(int)} is called exactly once for each call to this method.
*
* @param numQueryThreads the estimated number of query server threads
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
*
* @throws InterruptedException if the current thread is interrupted
* @throws RuntimeException if the query can never be dispatched due to the number of estimated query server threads
* being greater than the maximum number of server query threads calculated on the basis of
* <em>CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS * numServers / numBrokers</em>
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
public boolean tryAcquire(int numQueryThreads, long timeout, TimeUnit unit)
throws InterruptedException {
if (_maxConcurrentQueries <= 0) {
if (_maxServerQueryThreads <= 0) {
return true;
}
return _semaphore.tryAcquire(timeout, unit);

if (numQueryThreads > _semaphore.getTotalPermits()) {
throw new RuntimeException(
"Can't dispatch query because the estimated number of server threads for this query is too large for the "
+ "configured value of '" + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
+ "'. Consider increasing the value of this configuration");
}

return _semaphore.tryAcquire(numQueryThreads, timeout, unit);
}

/**
* Should be called after the query is done executing. It is the responsibility of the caller to ensure that this
* method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}.
* method is called exactly once for each call to {@link #tryAcquire(int, long, TimeUnit)}.
*/
public void release() {
if (_maxConcurrentQueries > 0) {
_semaphore.release();
public void release(int numQueryThreads) {
if (_maxServerQueryThreads > 0) {
_semaphore.release(numQueryThreads);
}
}

Expand All @@ -128,34 +147,33 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
if (numBrokers != _numBrokers || numServers != _numServers) {
_numBrokers = numBrokers;
_numServers = numServers;
if (_maxConcurrentQueries > 0) {
_semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers));
if (_maxServerQueryThreads > 0) {
_semaphore.setPermits(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers));
}
}
} else {
int maxConcurrentQueries = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
int maxServerQueryThreads = Integer.parseInt(_helixAdmin.getConfig(_helixConfigScope,
Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
.getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));

if (_maxConcurrentQueries == maxConcurrentQueries) {
if (_maxServerQueryThreads == maxServerQueryThreads) {
return;
}

if (_maxConcurrentQueries <= 0 && maxConcurrentQueries > 0
|| _maxConcurrentQueries > 0 && maxConcurrentQueries <= 0) {
if (_maxServerQueryThreads <= 0 && maxServerQueryThreads > 0
|| _maxServerQueryThreads > 0 && maxServerQueryThreads <= 0) {
// This operation isn't safe to do while queries are running so we require a restart of the broker for this
// change to take effect.
LOGGER.warn("Enabling or disabling limitation of the maximum number of multi-stage queries running "
+ "concurrently requires a restart of the broker to take effect");
return;
}

if (maxConcurrentQueries > 0) {
_semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers));
if (maxServerQueryThreads > 0) {
_semaphore.setPermits(Math.max(1, maxServerQueryThreads * _numServers / _numBrokers));
}
_maxConcurrentQueries = maxConcurrentQueries;
_maxServerQueryThreads = maxServerQueryThreads;
}
}

Expand Down
Loading

0 comments on commit c239952

Please sign in to comment.