From 2bc84d65303a8020f9e14a8ac0db0f21ed00d472 Mon Sep 17 00:00:00 2001 From: Bolek Ziobrowski <26925920+bziobrowski@users.noreply.github.com> Date: Wed, 22 Jan 2025 00:00:35 +0100 Subject: [PATCH 1/4] Fix application qps quota stalls. --- ...lixExternalViewBasedQueryQuotaManager.java | 84 ++++++++++++------- .../PinotApplicationQuotaRestletResource.java | 9 +- .../QueryQuotaClusterIntegrationTest.java | 2 +- 3 files changed, 58 insertions(+), 37 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 48c5c33d0a7a..b0d73f176549 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -74,6 +74,12 @@ * - broker added or removed from cluster */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { + + // Minimum 'working' value for app quota. If actual value is less than this (e.g. 0.0), it is considered as disabled. + private static final double MIN_APP_QUOTA = Math.nextUp(0.0); + // standard value meaning - no app quota limit set + private static final double DISABLED_APP_QUOTA = -1; + private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1; private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60; @@ -130,9 +136,10 @@ private void initializeApplicationQpsQuotas() { String appName = entry.getKey(); double appQpsQuota = - entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication; + entry.getValue() != null && entry.getValue() >= MIN_APP_QUOTA ? entry.getValue() + : _defaultQpsQuotaForApplication; - if (appQpsQuota < 0) { + if (appQpsQuota < MIN_APP_QUOTA) { buildEmptyOrResetApplicationRateLimiter(appName); continue; } @@ -348,16 +355,38 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List databas } public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) { - createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA); + } + + public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double override) { + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), override); } // Caller method need not worry about getting lock on _applicationRateLimiterMap // as this method will do idempotent updates to the application rate limiters - private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames) { + private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames, double override) { ExternalView brokerResource = getBrokerResource(); + Map quotas = null; + boolean quotasInitialized = false; + for (String appName : applicationNames) { - double qpsQuota = getEffectiveQueryQuotaOnApplication(appName); - if (qpsQuota < 0) { + double qpsQuota; + if (override >= MIN_APP_QUOTA) { + qpsQuota = override; + } else { + if (!quotasInitialized) { + quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); + quotasInitialized = true; + } + + if (quotas != null && quotas.get(appName) != null && quotas.get(appName) >= MIN_APP_QUOTA) { + qpsQuota = quotas.get(appName); + } else { + qpsQuota = _defaultQpsQuotaForApplication; + } + } + + if (qpsQuota < MIN_APP_QUOTA) { buildEmptyOrResetApplicationRateLimiter(appName); continue; } @@ -436,22 +465,6 @@ private double getEffectiveQueryQuotaOnDatabase(String databaseName) { return _defaultQpsQuotaForDatabase; } - /** - * Utility to get the effective query quota being imposed on an application. It is computed based on the default quota - * set at cluster config. - * - * @param applicationName application name to get the query quota on. - * @return effective query quota limit being applied - */ - private double getEffectiveQueryQuotaOnApplication(String applicationName) { - Map quotas = - ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); - if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) { - return quotas.get(applicationName); - } - return _defaultQpsQuotaForApplication; - } - /** * Creates a new database rate limiter. Will not update the database rate limiter if it already exists. * @param databaseName database name for which rate limiter needs to be created @@ -472,7 +485,7 @@ public void createApplicationRateLimiter(String applicationName) { if (_applicationRateLimiterMap.containsKey(applicationName)) { return; } - createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); + createOrUpdateApplicationRateLimiter(applicationName); } /** @@ -579,10 +592,12 @@ public boolean acquireApplication(String applicationName) { } QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName); if (queryQuota == null) { - if (getDefaultQueryQuotaForApplication() < 0) { + // do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names + if (_defaultQpsQuotaForApplication < MIN_APP_QUOTA) { return true; } else { - createOrUpdateApplicationRateLimiter(applicationName); + // create limiter without querying ZK + createOrUpdateApplicationRateLimiter(applicationName, _defaultQpsQuotaForApplication); queryQuota = _applicationRateLimiterMap.get(applicationName); } } @@ -809,8 +824,8 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke if (quota.getNumOnlineBrokers() != onlineBrokerCount) { quota.setNumOnlineBrokers(onlineBrokerCount); } - if (quota.getOverallRate() > 0) { - double qpsQuota = quota.getOverallRate() / onlineBrokerCount; + if (quota.getOverallRate() >= MIN_APP_QUOTA) { + double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA); quota.setRateLimiter(RateLimiter.create(qpsQuota)); } } @@ -843,7 +858,7 @@ public void processApplicationQueryRateLimitingClusterConfigChange() { if (oldQpsQuota == _defaultQpsQuotaForApplication) { return; } - createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet())); + createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()), DISABLED_APP_QUOTA); } private double getDefaultQueryQuotaForDatabase() { @@ -857,11 +872,16 @@ private double getDefaultQueryQuotaForDatabase() { private double getDefaultQueryQuotaForApplication() { HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); - HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( - _helixManager.getClusterName()).build(); - return Double.parseDouble(helixAdmin.getConfig(configScope, + HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(_helixManager.getClusterName()).build(); + String value = helixAdmin.getConfig(configScope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) - .getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1")); + .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND); + if (value != null) { + return Double.parseDouble(value); + } else { + return DISABLED_APP_QUOTA; + } } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java index db050168faf9..dec69d80ffa0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java @@ -71,7 +71,7 @@ public class PinotApplicationQuotaRestletResource { PinotHelixResourceManager _pinotHelixResourceManager; /** - * API to get application quota configs. Will return null if application quotas are not defined + * API to get application quota configs. Will return empty map if application quotas are not defined at all. */ @GET @Produces(MediaType.APPLICATION_JSON) @@ -88,7 +88,7 @@ public Map getApplicationQuotas(@Context HttpHeaders httpHeaders } /** - * API to get application quota configs. Will return null if application quotas are not defined + * API to get application quota config. Will return null if application quotas is not defined. */ @GET @Produces(MediaType.APPLICATION_JSON) @@ -104,15 +104,16 @@ public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam(" HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( _pinotHelixResourceManager.getHelixClusterName()).build(); + HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin(); String defaultQuota = helixAdmin.getConfig(scope, Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) - .getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, null); + .get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND); return defaultQuota != null ? Double.parseDouble(defaultQuota) : null; } /** - * API to update the quota configs for application + * API to update the quota config for application. */ @POST @Produces(MediaType.APPLICATION_JSON) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index a40cbdf2909c..5e807d55562f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -319,7 +319,7 @@ private void verifyQuotaUpdate(float quotaQps) { } catch (IOException e) { throw new RuntimeException(e); } - }, 5000, "Failed to reflect query quota on rate limiter in 5s."); + }, 10000, "Failed to reflect query quota on rate limiter in 5s."); } catch (AssertionError ae) { throw new AssertionError( ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae); From 1d6ca63df92309197dcf00547a7c66182fb045a0 Mon Sep 17 00:00:00 2001 From: Bolek Ziobrowski <26925920+bziobrowski@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:25:30 +0100 Subject: [PATCH 2/4] Revert unnecessary changes, add comments. --- .../queryquota/HelixExternalViewBasedQueryQuotaManager.java | 1 + .../integration/tests/QueryQuotaClusterIntegrationTest.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index b0d73f176549..484d8276b084 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -825,6 +825,7 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke quota.setNumOnlineBrokers(onlineBrokerCount); } if (quota.getOverallRate() >= MIN_APP_QUOTA) { + // number of permits must be positive but dividing by broker's count can result in 0 double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA); quota.setRateLimiter(RateLimiter.create(qpsQuota)); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index 5e807d55562f..a40cbdf2909c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -319,7 +319,7 @@ private void verifyQuotaUpdate(float quotaQps) { } catch (IOException e) { throw new RuntimeException(e); } - }, 10000, "Failed to reflect query quota on rate limiter in 5s."); + }, 5000, "Failed to reflect query quota on rate limiter in 5s."); } catch (AssertionError ae) { throw new AssertionError( ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae); From 8af9c397101f28f55cfa804e24edf566f8e6e415 Mon Sep 17 00:00:00 2001 From: Bolek Ziobrowski <26925920+bziobrowski@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:08:05 +0100 Subject: [PATCH 3/4] Apply CR suggestions. --- ...lixExternalViewBasedQueryQuotaManager.java | 79 +++++++++++-------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 484d8276b084..925fbc4860eb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -75,8 +75,9 @@ */ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { - // Minimum 'working' value for app quota. If actual value is less than this (e.g. 0.0), it is considered as disabled. - private static final double MIN_APP_QUOTA = Math.nextUp(0.0); + // Maximum 'disabled' value for app quota. If actual value is equal or less than this, it is considered as + // disabled, otherwise it's enabled. This is a side effect of rate limiter accepting only positive values. + private static final double MAX_DISABLED_APP_QUOTA = 0.0d; // standard value meaning - no app quota limit set private static final double DISABLED_APP_QUOTA = -1; @@ -136,10 +137,9 @@ private void initializeApplicationQpsQuotas() { String appName = entry.getKey(); double appQpsQuota = - entry.getValue() != null && entry.getValue() >= MIN_APP_QUOTA ? entry.getValue() - : _defaultQpsQuotaForApplication; + entry.getValue() != null ? entry.getValue() : _defaultQpsQuotaForApplication; - if (appQpsQuota < MIN_APP_QUOTA) { + if (isDisabled(appQpsQuota)) { buildEmptyOrResetApplicationRateLimiter(appName); continue; } @@ -151,8 +151,14 @@ private void initializeApplicationQpsQuotas() { new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), numOnlineBrokers, appQpsQuota, -1); _applicationRateLimiterMap.put(appName, queryQuotaEntity); } + } + + private static boolean isEnabled(double appQpsQuota) { + return appQpsQuota > MAX_DISABLED_APP_QUOTA; + } - return; + private static boolean isDisabled(double appQpsQuota) { + return appQpsQuota <= MAX_DISABLED_APP_QUOTA; } @Override @@ -358,38 +364,42 @@ public synchronized void createOrUpdateApplicationRateLimiter(String application createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA); } - public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double override) { - createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), override); + public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double newQps) { + createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), newQps); + } + + private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames) { + createOrUpdateApplicationRateLimiter(applicationNames, DISABLED_APP_QUOTA); } - // Caller method need not worry about getting lock on _applicationRateLimiterMap - // as this method will do idempotent updates to the application rate limiters - private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames, double override) { + /** + * Caller method need not worry about getting lock on _applicationRateLimiterMap + * as this method will do idempotent updates to the application rate limiters + * @param applicationNames application names for which to update the rate limiter + * @param newQps - if > 0, fixed value to use for rate limiter(s), otherwise value is fetched from ZK. + */ + private synchronized void createOrUpdateApplicationRateLimiter(List applicationNames, double newQps) { ExternalView brokerResource = getBrokerResource(); Map quotas = null; - boolean quotasInitialized = false; + if (applicationNames.size() > 0 && !isEnabled(newQps)) { + quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); + } for (String appName : applicationNames) { double qpsQuota; - if (override >= MIN_APP_QUOTA) { - qpsQuota = override; + if (isEnabled(newQps)) { + qpsQuota = newQps; + } else if (quotas != null && quotas.get(appName) != null) { + qpsQuota = quotas.get(appName); } else { - if (!quotasInitialized) { - quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); - quotasInitialized = true; - } - - if (quotas != null && quotas.get(appName) != null && quotas.get(appName) >= MIN_APP_QUOTA) { - qpsQuota = quotas.get(appName); - } else { - qpsQuota = _defaultQpsQuotaForApplication; - } + qpsQuota = _defaultQpsQuotaForApplication; } - if (qpsQuota < MIN_APP_QUOTA) { + if (isDisabled(qpsQuota)) { buildEmptyOrResetApplicationRateLimiter(appName); continue; } + int numOnlineBrokers = getNumOnlineBrokers(brokerResource); double perBrokerQpsQuota = qpsQuota / numOnlineBrokers; QueryQuotaEntity oldEntity = _applicationRateLimiterMap.get(appName); @@ -593,7 +603,7 @@ public boolean acquireApplication(String applicationName) { QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName); if (queryQuota == null) { // do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names - if (_defaultQpsQuotaForApplication < MIN_APP_QUOTA) { + if (isDisabled(_defaultQpsQuotaForApplication)) { return true; } else { // create limiter without querying ZK @@ -824,10 +834,12 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke if (quota.getNumOnlineBrokers() != onlineBrokerCount) { quota.setNumOnlineBrokers(onlineBrokerCount); } - if (quota.getOverallRate() >= MIN_APP_QUOTA) { - // number of permits must be positive but dividing by broker's count can result in 0 - double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA); - quota.setRateLimiter(RateLimiter.create(qpsQuota)); + if (isEnabled(quota.getOverallRate())) { + double qpsQuota = quota.getOverallRate() / onlineBrokerCount; + // dividing small qps value by broker's count can result in 0 and blow up in rate limiter + if (isEnabled(qpsQuota)) { + quota.setRateLimiter(RateLimiter.create(qpsQuota)); + } } } @@ -836,9 +848,8 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke } _lastKnownBrokerResourceVersion.set(currentVersionNumber); long endTime = System.currentTimeMillis(); - LOGGER - .info("Processed query quota change in {}ms, {} out of {} query quota configs rebuilt.", (endTime - startTime), - numRebuilt, _rateLimiterMap.size()); + LOGGER.info("Processed query quota change in {}ms, {} out of {} query quota configs rebuilt.", + (endTime - startTime), numRebuilt, _rateLimiterMap.size()); } /** @@ -859,7 +870,7 @@ public void processApplicationQueryRateLimitingClusterConfigChange() { if (oldQpsQuota == _defaultQpsQuotaForApplication) { return; } - createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()), DISABLED_APP_QUOTA); + createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet())); } private double getDefaultQueryQuotaForDatabase() { From e4395e10f7174f6cb9e39ce94d7e2c3e41935099 Mon Sep 17 00:00:00 2001 From: Bolek Ziobrowski <26925920+bziobrowski@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:30:12 +0100 Subject: [PATCH 4/4] Apply CR suggestions. --- ...xternalViewBasedQueryQuotaManagerTest.java | 2 +- .../PinotApplicationQuotaRestletResource.java | 5 +- .../QueryQuotaClusterIntegrationTest.java | 68 ++++++++++++++----- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java index 131faee022dd..e2efb05f45ec 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java @@ -338,7 +338,7 @@ public void testWhenOnlyDefaultAppQuotaIsSetItAffectsAllApplications() } @Test - public void tesCreateAndUpdateAppRateLimiterChangesRateLimiterMap() { + public void testCreateAndUpdateAppRateLimiterChangesRateLimiterMap() { Map apps = new HashMap<>(); apps.put("app1", null); apps.put("app2", 1d); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java index dec69d80ffa0..d539e6c3aba8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotApplicationQuotaRestletResource.java @@ -102,8 +102,9 @@ public Double getApplicationQuota(@Context HttpHeaders httpHeaders, @PathParam(" return quotas.get(appName); } - HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( - _pinotHelixResourceManager.getHelixClusterName()).build(); + HelixConfigScope scope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) + .forCluster(_pinotHelixResourceManager.getHelixClusterName()) + .build(); HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin(); String defaultQuota = diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java index a40cbdf2909c..d87731961fca 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java @@ -142,10 +142,35 @@ public void testDefaultDatabaseQueryQuotaOverride() public void testDefaultApplicationQueryQuotaOverride() throws Exception { addAppQueryQuotaToClusterConfig(25); + // override lower than default quota setQueryQuotaForApplication(10); testQueryRate(10); + // override higher than default quota + setQueryQuotaForApplication(27); + testQueryRate(27); + + // disable + setQueryQuotaForApplication(-1); + verifyQuotaUpdate(Long.MAX_VALUE); + runQueries(50, false); + + // verify that default still applies to other application names + runQueries(20, false, "other"); + //increase the qps and some of the queries should be throttled. + runQueries(50, true, "other"); + } + + @Test + public void testDisabledDefaultApplicationQueryQuotaOverride() + throws Exception { + addAppQueryQuotaToClusterConfig(-1); + + verifyQuotaUpdate(Long.MAX_VALUE); + runQueries(10, false); + + // override default quota setQueryQuotaForApplication(40); testQueryRate(40); } @@ -264,48 +289,58 @@ private static void sleep(long deadline, double iterationsLeft) { } } + private void runQueries(int qps, boolean shouldFail) { + runQueries(qps, shouldFail, "default"); + } + // try to keep the qps below 50 to ensure that the time lost between 2 query runs on top of the sleepMillis // is not comparable to sleepMillis, else the actual qps would end up being much lower than required qps - private void runQueries(int qps, boolean shouldFail) { + private void runQueries(int qps, boolean shouldFail, String applicationName) { int failCount = 0; + boolean isLastFail = false; long deadline = System.currentTimeMillis() + 1000; + String query = "SET applicationName='" + applicationName + "'; SELECT COUNT(*) FROM " + getTableName(); + for (int i = 0; i < qps; i++) { sleep(deadline, qps - i); - ResultSetGroup resultSetGroup = - _pinotConnection.execute("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()); + ResultSetGroup resultSetGroup = _pinotConnection.execute(query); for (PinotClientException exception : resultSetGroup.getExceptions()) { if (exception.getMessage().contains("QuotaExceededError")) { failCount++; + isLastFail = i == qps - 1; break; } } } if (shouldFail) { - assertTrue(failCount != 0, "Expected nonzero failures for qps: " + qps); + Assert.assertNotEquals(failCount, 0, "Expected nonzero failures for qps: " + qps + " isLastFail: " + isLastFail); } else { - Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + qps); + Assert.assertEquals(failCount, 0, "Expected zero failures for qps: " + qps + " isLastFail: " + isLastFail); } } - private static volatile float _quota; + private static volatile double _quota; private static volatile String _quotaSource; - private void verifyQuotaUpdate(float quotaQps) { + private void verifyQuotaUpdate(double quotaQps) { try { TestUtils.waitForCondition(aVoid -> { try { - float tableQuota = Float.parseFloat(sendGetRequest( - String.format("http://%s/debug/tables/queryQuota/%s_OFFLINE", _brokerHostPort, getTableName()))); + double tableQuota = Double.parseDouble(sendGetRequest( + "http://" + _brokerHostPort + "/debug/tables/queryQuota/" + getTableName() + "_OFFLINE")); + double dbQuota = Double.parseDouble( + sendGetRequest("http://" + _brokerHostPort + "/debug/databases/queryQuota/default")); + double appQuota = Double.parseDouble( + sendGetRequest("http://" + _brokerHostPort + "/debug/applicationQuotas/default")); + tableQuota = tableQuota == 0 ? Long.MAX_VALUE : tableQuota; - float dbQuota = Float.parseFloat( - sendGetRequest(String.format("http://%s/debug/databases/queryQuota/default", _brokerHostPort))); - float appQuota = Float.parseFloat( - sendGetRequest(String.format("http://%s/debug/applicationQuotas/default", _brokerHostPort))); dbQuota = dbQuota == 0 ? Long.MAX_VALUE : dbQuota; appQuota = appQuota == 0 ? Long.MAX_VALUE : appQuota; - float actualQuota = Math.min(Math.min(tableQuota, dbQuota), appQuota); + + double actualQuota = Math.min(Math.min(tableQuota, dbQuota), appQuota); + _quota = actualQuota; if (_quota == dbQuota) { _quotaSource = "database"; @@ -314,12 +349,13 @@ private void verifyQuotaUpdate(float quotaQps) { } else { _quotaSource = "application"; } - return quotaQps == actualQuota || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE + return Math.abs(quotaQps - actualQuota) < 0.01 + || (quotaQps == 0 && tableQuota == Long.MAX_VALUE && dbQuota == Long.MAX_VALUE && appQuota == Long.MAX_VALUE); } catch (IOException e) { throw new RuntimeException(e); } - }, 5000, "Failed to reflect query quota on rate limiter in 5s."); + }, 10000, "Failed to reflect query quota on rate limiter in 5s."); } catch (AssertionError ae) { throw new AssertionError( ae.getMessage() + " Expected quota:" + quotaQps + " but is: " + _quota + " set on: " + _quotaSource, ae);