-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix application qps quota stalls. #14859
base: master
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -74,6 +74,12 @@ | |||||
* - broker added or removed from cluster | ||||||
*/ | ||||||
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager { | ||||||
|
||||||
// Minimum 'working' value for app quota. If actual value is less than this (e.g. 0.0), it is considered as disabled. | ||||||
private static final double MIN_APP_QUOTA = Math.nextUp(0.0); | ||||||
// standard value meaning - no app quota limit set | ||||||
private static final double DISABLED_APP_QUOTA = -1; | ||||||
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class); | ||||||
private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1; | ||||||
private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60; | ||||||
|
@@ -130,9 +136,10 @@ private void initializeApplicationQpsQuotas() { | |||||
|
||||||
String appName = entry.getKey(); | ||||||
double appQpsQuota = | ||||||
entry.getValue() != null && entry.getValue() != -1.0d ? entry.getValue() : _defaultQpsQuotaForApplication; | ||||||
entry.getValue() != null && entry.getValue() >= MIN_APP_QUOTA ? entry.getValue() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not introduced in this PR, but we might want to allow overriding default quota to disable throttling
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Applied. |
||||||
: _defaultQpsQuotaForApplication; | ||||||
|
||||||
if (appQpsQuota < 0) { | ||||||
if (appQpsQuota < MIN_APP_QUOTA) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same for other places
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above. |
||||||
buildEmptyOrResetApplicationRateLimiter(appName); | ||||||
continue; | ||||||
} | ||||||
|
@@ -348,16 +355,38 @@ private synchronized void createOrUpdateDatabaseRateLimiter(List<String> databas | |||||
} | ||||||
|
||||||
public synchronized void createOrUpdateApplicationRateLimiter(String applicationName) { | ||||||
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); | ||||||
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), DISABLED_APP_QUOTA); | ||||||
} | ||||||
|
||||||
public synchronized void createOrUpdateApplicationRateLimiter(String applicationName, double override) { | ||||||
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName), override); | ||||||
} | ||||||
|
||||||
// Caller method need not worry about getting lock on _applicationRateLimiterMap | ||||||
// as this method will do idempotent updates to the application rate limiters | ||||||
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames) { | ||||||
private synchronized void createOrUpdateApplicationRateLimiter(List<String> applicationNames, double override) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider making override a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added more method to hide the override when it's not needed but kept it as primitive because null is not necessarily more readable and would trigger boxing/unboxing. |
||||||
ExternalView brokerResource = getBrokerResource(); | ||||||
Map<String, Double> quotas = null; | ||||||
boolean quotasInitialized = false; | ||||||
|
||||||
for (String appName : applicationNames) { | ||||||
double qpsQuota = getEffectiveQueryQuotaOnApplication(appName); | ||||||
if (qpsQuota < 0) { | ||||||
double qpsQuota; | ||||||
if (override >= MIN_APP_QUOTA) { | ||||||
qpsQuota = override; | ||||||
} else { | ||||||
if (!quotasInitialized) { | ||||||
quotas = ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); | ||||||
quotasInitialized = true; | ||||||
} | ||||||
|
||||||
if (quotas != null && quotas.get(appName) != null && quotas.get(appName) >= MIN_APP_QUOTA) { | ||||||
qpsQuota = quotas.get(appName); | ||||||
} else { | ||||||
qpsQuota = _defaultQpsQuotaForApplication; | ||||||
} | ||||||
} | ||||||
|
||||||
if (qpsQuota < MIN_APP_QUOTA) { | ||||||
buildEmptyOrResetApplicationRateLimiter(appName); | ||||||
continue; | ||||||
} | ||||||
|
@@ -436,22 +465,6 @@ private double getEffectiveQueryQuotaOnDatabase(String databaseName) { | |||||
return _defaultQpsQuotaForDatabase; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Utility to get the effective query quota being imposed on an application. It is computed based on the default quota | ||||||
* set at cluster config. | ||||||
* | ||||||
* @param applicationName application name to get the query quota on. | ||||||
* @return effective query quota limit being applied | ||||||
*/ | ||||||
private double getEffectiveQueryQuotaOnApplication(String applicationName) { | ||||||
Map<String, Double> quotas = | ||||||
ZKMetadataProvider.getApplicationQpsQuotas(_helixManager.getHelixPropertyStore()); | ||||||
if (quotas != null && quotas.get(applicationName) != null && quotas.get(applicationName) != -1.0d) { | ||||||
return quotas.get(applicationName); | ||||||
} | ||||||
return _defaultQpsQuotaForApplication; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Creates a new database rate limiter. Will not update the database rate limiter if it already exists. | ||||||
* @param databaseName database name for which rate limiter needs to be created | ||||||
|
@@ -472,7 +485,7 @@ public void createApplicationRateLimiter(String applicationName) { | |||||
if (_applicationRateLimiterMap.containsKey(applicationName)) { | ||||||
return; | ||||||
} | ||||||
createOrUpdateApplicationRateLimiter(Collections.singletonList(applicationName)); | ||||||
createOrUpdateApplicationRateLimiter(applicationName); | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -579,10 +592,12 @@ public boolean acquireApplication(String applicationName) { | |||||
} | ||||||
QueryQuotaEntity queryQuota = _applicationRateLimiterMap.get(applicationName); | ||||||
if (queryQuota == null) { | ||||||
if (getDefaultQueryQuotaForApplication() < 0) { | ||||||
// do not create a new rate limiter because that could lead to OOM if client floods us with many unique app names | ||||||
if (_defaultQpsQuotaForApplication < MIN_APP_QUOTA) { | ||||||
return true; | ||||||
} else { | ||||||
createOrUpdateApplicationRateLimiter(applicationName); | ||||||
// create limiter without querying ZK | ||||||
createOrUpdateApplicationRateLimiter(applicationName, _defaultQpsQuotaForApplication); | ||||||
queryQuota = _applicationRateLimiterMap.get(applicationName); | ||||||
} | ||||||
} | ||||||
|
@@ -809,8 +824,9 @@ public void processQueryRateLimitingExternalViewChange(ExternalView currentBroke | |||||
if (quota.getNumOnlineBrokers() != onlineBrokerCount) { | ||||||
quota.setNumOnlineBrokers(onlineBrokerCount); | ||||||
} | ||||||
if (quota.getOverallRate() > 0) { | ||||||
double qpsQuota = quota.getOverallRate() / onlineBrokerCount; | ||||||
if (quota.getOverallRate() >= MIN_APP_QUOTA) { | ||||||
// number of permits must be positive but dividing by broker's count can result in 0 | ||||||
double qpsQuota = Math.max(quota.getOverallRate() / onlineBrokerCount, MIN_APP_QUOTA); | ||||||
quota.setRateLimiter(RateLimiter.create(qpsQuota)); | ||||||
} | ||||||
} | ||||||
|
@@ -843,7 +859,7 @@ public void processApplicationQueryRateLimitingClusterConfigChange() { | |||||
if (oldQpsQuota == _defaultQpsQuotaForApplication) { | ||||||
return; | ||||||
} | ||||||
createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet())); | ||||||
createOrUpdateApplicationRateLimiter(new ArrayList<>(_applicationRateLimiterMap.keySet()), DISABLED_APP_QUOTA); | ||||||
} | ||||||
|
||||||
private double getDefaultQueryQuotaForDatabase() { | ||||||
|
@@ -857,11 +873,16 @@ private double getDefaultQueryQuotaForDatabase() { | |||||
|
||||||
private double getDefaultQueryQuotaForApplication() { | ||||||
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool(); | ||||||
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( | ||||||
_helixManager.getClusterName()).build(); | ||||||
return Double.parseDouble(helixAdmin.getConfig(configScope, | ||||||
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) | ||||||
.forCluster(_helixManager.getClusterName()).build(); | ||||||
String value = helixAdmin.getConfig(configScope, | ||||||
Collections.singletonList(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND)) | ||||||
.getOrDefault(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND, "-1")); | ||||||
.get(CommonConstants.Helix.APPLICATION_MAX_QUERIES_PER_SECOND); | ||||||
if (value != null) { | ||||||
return Double.parseDouble(value); | ||||||
} else { | ||||||
return DISABLED_APP_QUOTA; | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the intention here is to treat
0
as disabled. It is not very readable to have this minimum double, can we change the comparison (e.g.<
to<=
) sign instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the value and hid logic behind isDisabled(), isEnabled() methods.