diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 60d1a366a104..bac93bba677e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -238,10 +238,11 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } Timer queryTimer = new Timer(queryTimeoutMs); + int estimatedNumQueryThreads = dispatchableSubPlan.getEstimatedNumQueryThreads(); try { // It's fine to block in this thread because we use a separate thread pool from the main Jersey server to process // these requests. - if (!_queryThrottler.tryAcquire(queryTimeoutMs, TimeUnit.MILLISECONDS)) { + if (!_queryThrottler.tryAcquire(estimatedNumQueryThreads, queryTimeoutMs, TimeUnit.MILLISECONDS)) { LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query); requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE); return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); @@ -322,7 +323,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return brokerResponse; } finally { - _queryThrottler.release(); + _queryThrottler.release(estimatedNumQueryThreads); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java index a6ca713b19f4..420d8da348a2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottler.java @@ -37,8 +37,14 @@ /** * This class helps limit the number of multi-stage queries being executed concurrently. Note that the cluster - * configuration is a "per server" value and the broker currently simply assumes that a query will be across all - * servers. Another assumption here is that queries are evenly distributed across brokers. + * configuration is a "per server" value and the broker currently computes the max server query threads as + * CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS * numServers / numBrokers. Note that the config value, + * number of servers, and number of brokers are all dynamically updated here. + *
+ * Another assumption made here is that queries are evenly distributed across brokers. + *
+ * This is designed to limit the number of multi-stage queries being concurrently executed across a cluster and is not
+ * intended to prevent individual large queries from being executed.
*/
public class MultiStageQueryThrottler implements ClusterChangeHandler {
@@ -50,10 +56,10 @@ public class MultiStageQueryThrottler implements ClusterChangeHandler {
private int _numBrokers;
private int _numServers;
/**
- * If _maxConcurrentQueries is <= 0, it means that the cluster is not configured to limit the number of multi-stage
+ * If _maxServerQueryThreads is <= 0, it means that the cluster is not configured to limit the number of multi-stage
* queries that can be executed concurrently. In this case, we should not block the query.
*/
- private int _maxConcurrentQueries;
+ private int _maxServerQueryThreads;
private AdjustableSemaphore _semaphore;
@Override
@@ -63,11 +69,11 @@ public void init(HelixManager helixManager) {
_helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
_helixManager.getClusterName()).build();
- _maxConcurrentQueries = Integer.parseInt(
+ _maxServerQueryThreads = Integer.parseInt(
_helixAdmin.getConfig(_helixConfigScope,
- Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
- .getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
- CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
+ Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
+ .getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
+ CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));
List
- * {@link #release()} should be called after the query is done executing. It is the responsibility of the caller to
- * ensure that {@link #release()} is called exactly once for each call to this method.
+ * {@link #release(int)} should be called after the query is done executing. It is the responsibility of the caller to
+ * ensure that {@link #release(int)} is called exactly once for each call to this method.
*
+ * @param numQueryThreads the estimated number of query server threads
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
+ *
* @throws InterruptedException if the current thread is interrupted
+ * @throws RuntimeException if the query can never be dispatched due to the number of estimated query server threads
+ * being greater than the maximum number of server query threads calculated on the basis of
+ * CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS * numServers / numBrokers
*/
- public boolean tryAcquire(long timeout, TimeUnit unit)
+ public boolean tryAcquire(int numQueryThreads, long timeout, TimeUnit unit)
throws InterruptedException {
- if (_maxConcurrentQueries <= 0) {
+ if (_maxServerQueryThreads <= 0) {
return true;
}
- return _semaphore.tryAcquire(timeout, unit);
+
+ if (numQueryThreads > _semaphore.getTotalPermits()) {
+ throw new RuntimeException(
+ "Can't dispatch query because the estimated number of server threads for this query is too large for the "
+ + "configured value of '" + CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS
+ + "'. Consider increasing the value of this configuration");
+ }
+
+ return _semaphore.tryAcquire(numQueryThreads, timeout, unit);
}
/**
* Should be called after the query is done executing. It is the responsibility of the caller to ensure that this
- * method is called exactly once for each call to {@link #tryAcquire(long, TimeUnit)}.
+ * method is called exactly once for each call to {@link #tryAcquire(int, long, TimeUnit)}.
*/
- public void release() {
- if (_maxConcurrentQueries > 0) {
- _semaphore.release();
+ public void release(int numQueryThreads) {
+ if (_maxServerQueryThreads > 0) {
+ _semaphore.release(numQueryThreads);
}
}
@@ -128,23 +147,22 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
if (numBrokers != _numBrokers || numServers != _numServers) {
_numBrokers = numBrokers;
_numServers = numServers;
- if (_maxConcurrentQueries > 0) {
- _semaphore.setPermits(Math.max(1, _maxConcurrentQueries * _numServers / _numBrokers));
+ if (_maxServerQueryThreads > 0) {
+ _semaphore.setPermits(Math.max(1, _maxServerQueryThreads * _numServers / _numBrokers));
}
}
} else {
- int maxConcurrentQueries = Integer.parseInt(
- _helixAdmin.getConfig(_helixConfigScope,
- Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))
- .getOrDefault(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
- CommonConstants.Helix.DEFAULT_MAX_CONCURRENT_MULTI_STAGE_QUERIES));
+ int maxServerQueryThreads = Integer.parseInt(_helixAdmin.getConfig(_helixConfigScope,
+ Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS))
+ .getOrDefault(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS,
+ CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS));
- if (_maxConcurrentQueries == maxConcurrentQueries) {
+ if (_maxServerQueryThreads == maxServerQueryThreads) {
return;
}
- if (_maxConcurrentQueries <= 0 && maxConcurrentQueries > 0
- || _maxConcurrentQueries > 0 && maxConcurrentQueries <= 0) {
+ if (_maxServerQueryThreads <= 0 && maxServerQueryThreads > 0
+ || _maxServerQueryThreads > 0 && maxServerQueryThreads <= 0) {
// This operation isn't safe to do while queries are running so we require a restart of the broker for this
// change to take effect.
LOGGER.warn("Enabling or disabling limitation of the maximum number of multi-stage queries running "
@@ -152,10 +170,10 @@ public void processClusterChange(HelixConstants.ChangeType changeType) {
return;
}
- if (maxConcurrentQueries > 0) {
- _semaphore.setPermits(Math.max(1, maxConcurrentQueries * _numServers / _numBrokers));
+ if (maxServerQueryThreads > 0) {
+ _semaphore.setPermits(Math.max(1, maxServerQueryThreads * _numServers / _numBrokers));
}
- _maxConcurrentQueries = maxConcurrentQueries;
+ _maxServerQueryThreads = maxServerQueryThreads;
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
index fe2a5a124006..fcd5e3d7d0ff 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageQueryThrottlerTest.java
@@ -52,8 +52,9 @@ public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
when(_helixManager.getClusterManagmentTool()).thenReturn(_helixAdmin);
when(_helixManager.getClusterName()).thenReturn("testCluster");
- when(_helixAdmin.getConfig(any(), any())).thenReturn(
- Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4"));
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "4"));
when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
List.of("Broker_0", "Broker_1", "Server_0", "Server_1"));
}
@@ -70,9 +71,9 @@ public void testBasicAcquireRelease()
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
- _multiStageQueryThrottler.release();
+ _multiStageQueryThrottler.release(1);
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
}
@@ -80,30 +81,31 @@ public void testBasicAcquireRelease()
public void testAcquireTimeout()
throws Exception {
when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))).thenReturn(
- Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2"));
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "2"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
}
@Test
public void testDisabledThrottling()
throws Exception {
- when(_helixAdmin.getConfig(any(), any())).thenReturn(
- Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ when(_helixAdmin.getConfig(any(),
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
// If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
// succeed
for (int i = 0; i < 100; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(10, 100, TimeUnit.MILLISECONDS));
}
}
@@ -113,10 +115,10 @@ public void testIncreaseNumBrokers()
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
// Increase the number of brokers
@@ -126,13 +128,13 @@ public void testIncreaseNumBrokers()
// Verify that the number of permits on this broker have been reduced to account for the new brokers
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2);
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
- for (int i = 0; i < 4; i++) {
- _multiStageQueryThrottler.release();
+ for (int i = 0; i < 2; i++) {
+ _multiStageQueryThrottler.release(2);
}
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
}
@Test
@@ -141,10 +143,10 @@ public void testDecreaseNumBrokers()
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
// Decrease the number of brokers
@@ -153,8 +155,8 @@ public void testDecreaseNumBrokers()
// Ensure that the permits from the removed broker are added to this one.
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
- Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(3, 100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
}
@Test
@@ -163,10 +165,10 @@ public void testIncreaseNumServers()
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
// Increase the number of servers
@@ -176,8 +178,8 @@ public void testIncreaseNumServers()
// Ensure that the permits on this broker are increased to account for the new server
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
- Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
}
@Test
@@ -186,10 +188,10 @@ public void testDecreaseNumServers()
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
// Decrease the number of servers
@@ -198,63 +200,61 @@ public void testDecreaseNumServers()
// Verify that the number of permits on this broker have been reduced to account for the removed server
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -2);
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
- for (int i = 0; i < 4; i++) {
- _multiStageQueryThrottler.release();
+ for (int i = 0; i < 2; i++) {
+ _multiStageQueryThrottler.release(2);
}
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
@Test
- public void testIncreaseMaxConcurrentQueries()
+ public void testIncreaseMaxServerQueryThreads()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
// Increase the value of cluster config maxConcurrentQueries
- when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES))))
- .thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "8"));
+ when(_helixAdmin.getConfig(any(), any()))
+ .thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "8"));
_multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
@Test
- public void testDecreaseMaxConcurrentQueries()
+ public void testDecreaseMaxServerQueryThreads()
throws Exception {
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
// Decrease the value of cluster config maxConcurrentQueries
- when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
- ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "3"));
+ when(_helixAdmin.getConfig(any(), any())).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "3"));
_multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), -1);
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
- for (int i = 0; i < 4; i++) {
- _multiStageQueryThrottler.release();
+ for (int i = 0; i < 2; i++) {
+ _multiStageQueryThrottler.release(2);
}
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 3);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
}
@Test
@@ -266,63 +266,78 @@ public void testEnabledToDisabledTransitionDisallowed()
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
// Disable the throttling mechanism via cluster config change
- when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
- ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ when(_helixAdmin.getConfig(any(), any())).thenReturn(
+ Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1"));
_multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
// Should not be allowed to disable the throttling mechanism if it is enabled during startup
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
for (int i = 0; i < 4; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
}
Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(1, 100, TimeUnit.MILLISECONDS));
}
@Test
public void testDisabledToEnabledTransitionDisallowed()
throws Exception {
when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
- ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "-1"));
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "-1"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- // If maxConcurrentQueries is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
+ // If maxServerQueryThreads is <= 0, the throttling mechanism should be "disabled" and any attempt to acquire should
// succeed
for (int i = 0; i < 100; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(10, 100, TimeUnit.MILLISECONDS));
}
// Enable the throttling mechanism via cluster config change
when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
- ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "4"));
+ eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS)))
+ ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "4"));
_multiStageQueryThrottler.processClusterChange(HelixConstants.ChangeType.CLUSTER_CONFIG);
// Should not be allowed to enable the throttling mechanism if it is disabled during startup
for (int i = 0; i < 100; i++) {
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(10, 100, TimeUnit.MILLISECONDS));
}
}
@Test
- public void testMaxConcurrentQueriesSmallerThanNumBrokers()
+ public void testLowMaxServerQueryThreads() {
+ _multiStageQueryThrottler = new MultiStageQueryThrottler();
+ _multiStageQueryThrottler.init(_helixManager);
+
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+ // Thrown if the estimated number of query threads is greater than the number of available permits to this broker
+ Assert.assertThrows(RuntimeException.class,
+ () -> _multiStageQueryThrottler.tryAcquire(10, 100, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testAcquireReleaseWithDifferentQuerySizes()
throws Exception {
- when(_helixAdmin.getConfig(any(),
- eq(Collections.singletonList(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES)))
- ).thenReturn(Map.of(CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES, "2"));
- when(_helixAdmin.getInstancesInCluster(eq("testCluster"))).thenReturn(
- List.of("Broker_0", "Broker_1", "Broker_2", "Broker_3", "Server_0", "Server_1"));
_multiStageQueryThrottler = new MultiStageQueryThrottler();
_multiStageQueryThrottler.init(_helixManager);
- // The total permits should be capped at 1 even though maxConcurrentQueries * numServers / numBrokers is 0.
- Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 1);
- Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
- Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 0);
- Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 4);
+
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(_multiStageQueryThrottler.availablePermits(), 2);
+
+ // A query with more than 2 threads shouldn't be permitted but a query with 2 threads should be permitted
+ Assert.assertFalse(_multiStageQueryThrottler.tryAcquire(3, 100, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(2, 100, TimeUnit.MILLISECONDS));
+
+ // Release the permits
+ _multiStageQueryThrottler.release(2);
+ _multiStageQueryThrottler.release(2);
+
+ // The query with more than 2 threads should now be permitted
+ Assert.assertTrue(_multiStageQueryThrottler.tryAcquire(3, 100, TimeUnit.MILLISECONDS));
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
index 2bbc25e42a0d..b6a3408e49ac 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/concurrency/AdjustableSemaphore.java
@@ -27,7 +27,7 @@
*/
public class AdjustableSemaphore extends Semaphore {
- private int _totalPermits;
+ private volatile int _totalPermits;
public AdjustableSemaphore(int permits) {
super(permits);
@@ -39,6 +39,9 @@ public AdjustableSemaphore(int permits, boolean fair) {
_totalPermits = permits;
}
+ /**
+ * Sets the total number of permits to the given value without blocking.
+ */
public void setPermits(int permits) {
Preconditions.checkArgument(permits > 0, "Permits must be a positive integer");
if (permits < _totalPermits) {
@@ -48,4 +51,12 @@ public void setPermits(int permits) {
}
_totalPermits = permits;
}
+
+ /**
+ * Returns the total number of permits (as opposed to just the number of available permits returned by
+ * {@link #availablePermits()}).
+ */
+ public int getTotalPermits() {
+ return _totalPermits;
+ }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 4164b488d320..0b1f0abc8038 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -87,13 +87,13 @@ public void setUp()
startZk();
startController();
- // Set the max concurrent multi-stage queries to 5 for the cluster, so that we can test the query queueing logic
+ // Set the multi-stage max server query threads for the cluster, so that we can test the query queueing logic
// in the MultiStageBrokerRequestHandler
HelixConfigScope scope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
.build();
- _helixManager.getConfigAccessor().set(scope, CommonConstants.Helix.CONFIG_OF_MAX_CONCURRENT_MULTI_STAGE_QUERIES,
- "5");
+ _helixManager.getConfigAccessor()
+ .set(scope, CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_MAX_SERVER_QUERY_THREADS, "10");
startBroker();
startServer();
@@ -717,7 +717,8 @@ public void testMultiValueColumnGroupBy()
}
@Test
- public void testVariadicFunction() throws Exception {
+ public void testVariadicFunction()
+ throws Exception {
String sqlQuery = "SELECT ARRAY_TO_MV(VALUE_IN(RandomAirports, 'MFR', 'SUN', 'GTR')) as airport, count(*) "
+ "FROM mytable WHERE ARRAY_TO_MV(RandomAirports) IN ('MFR', 'SUN', 'GTR') GROUP BY airport";
JsonNode jsonNode = postQuery(sqlQuery);
@@ -729,7 +730,8 @@ public void testVariadicFunction() throws Exception {
@Test(dataProvider = "polymorphicScalarComparisonFunctionsDataProvider")
public void testPolymorphicScalarComparisonFunctions(String type, String literal, String lesserLiteral,
- Object expectedValue) throws Exception {
+ Object expectedValue)
+ throws Exception {
// Queries written this way will trigger the PinotEvaluateLiteralRule which will call the scalar comparison function
// on the literals. Simpler queries like SELECT ... WHERE 'test' = 'test' will not trigger the optimization rule
@@ -770,7 +772,8 @@ public void testPolymorphicScalarComparisonFunctions(String type, String literal
}
@Test
- public void testPolymorphicScalarComparisonFunctionsDifferentType() throws Exception {
+ public void testPolymorphicScalarComparisonFunctionsDifferentType()
+ throws Exception {
// Don't support comparison for literals with different types
String sqlQueryPrefix = "WITH data as (SELECT 1 as \"foo\" FROM mytable) "
+ "SELECT * FROM data WHERE \"foo\" ";
@@ -816,8 +819,10 @@ Object[][] polymorphicScalarComparisonFunctionsDataProvider() {
inputs.add(new Object[]{"FLOAT", "CAST(1.234 AS FLOAT)", "CAST(1.23 AS FLOAT)", "1.234"});
inputs.add(new Object[]{"DOUBLE", "1.234", "1.23", "1.234"});
inputs.add(new Object[]{"BOOLEAN", "CAST(true AS BOOLEAN)", "CAST(FALSE AS BOOLEAN)", "true"});
- inputs.add(new Object[]{"TIMESTAMP", "CAST(1723593600000 AS TIMESTAMP)", "CAST (1623593600000 AS TIMESTAMP)",
- new DateTime(1723593600000L, DateTimeZone.getDefault()).toString("yyyy-MM-dd HH:mm:ss.S")});
+ inputs.add(new Object[]{
+ "TIMESTAMP", "CAST(1723593600000 AS TIMESTAMP)", "CAST (1623593600000 AS TIMESTAMP)",
+ new DateTime(1723593600000L, DateTimeZone.getDefault()).toString("yyyy-MM-dd HH:mm:ss.S")
+ });
return inputs.toArray(new Object[0][]);
}
@@ -943,7 +948,8 @@ public void testSearch()
}
@Test
- public void testLiteralFilterReduce() throws Exception {
+ public void testLiteralFilterReduce()
+ throws Exception {
String sqlQuery = "SELECT * FROM (SELECT CASE WHEN AirTime > 0 THEN 'positive' ELSE 'negative' END AS AirTime "
+ "FROM mytable) WHERE AirTime IN ('positive', 'negative')";
JsonNode jsonNode = postQuery(sqlQuery);
@@ -1096,7 +1102,8 @@ public void testNullIf()
}
@Test
- public void testMVNumericCastInFilter() throws Exception {
+ public void testMVNumericCastInFilter()
+ throws Exception {
String sqlQuery = "SELECT COUNT(*) FROM mytable WHERE ARRAY_TO_MV(CAST(DivAirportIDs AS BIGINT ARRAY)) > 0";
JsonNode jsonNode = postQuery(sqlQuery);
assertNoError(jsonNode);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
index c1cfb89ab7fc..5299b08ce7af 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.calcite.runtime.PairList;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
/**
@@ -82,4 +83,38 @@ public Set