Skip to content

Commit

Permalink
Debug endpoints to fetch effective query quotas on broker (#13864)
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 authored Aug 30, 2024
1 parent e14d887 commit fc132c3
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.DatabaseUtils;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class PinotBrokerDebug {
@Inject
private ServerRoutingStatsManager _serverRoutingStatsManager;

@Inject
private QueryQuotaManager _queryQuotaManager;

@Inject
AccessControlFactory _accessControlFactory;

Expand Down Expand Up @@ -295,4 +299,28 @@ public Collection<? extends QueryResourceTracker> getQueryUsage() {
ThreadResourceUsageAccountant threadAccountant = Tracing.getThreadAccountant();
return threadAccountant.getQueryResources().values();
}

@GET
@Path("debug/tables/queryQuota/{tableName}")
@Produces(MediaType.TEXT_PLAIN)
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_TABLE_QUERY_QUOTA)
@ApiOperation(value = "Get the active query quota being imposed on the table", notes = "This is a debug endpoint, "
+ "and won't maintain backward compatibility")
public String getTableQueryQuota(
@ApiParam(value = "Name of the table with type") @PathParam("tableName") String tableName,
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
return String.valueOf(_queryQuotaManager.getTableQueryQuota(tableName));
}

@GET
@Path("debug/databases/queryQuota/{databaseName}")
@Produces(MediaType.TEXT_PLAIN)
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_DATABASE_QUERY_QUOTA)
@ApiOperation(value = "Get the active query quota being imposed on the database", notes = "This is a debug endpoint, "
+ "and won't maintain backward compatibility")
public String getDatabaseQueryQuota(
@ApiParam(value = "Name of the database") @PathParam("databaseName") String databaseName) {
return String.valueOf(_queryQuotaManager.getDatabaseQueryQuota(databaseName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.util.Timeout;
import org.apache.helix.HelixManager;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class BrokerAdminApiApplication extends ResourceConfig {
public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequestHandler brokerRequestHandler,
BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, SqlQueryExecutor sqlQueryExecutor,
ServerRoutingStatsManager serverRoutingStatsManager, AccessControlFactory accessFactory,
HelixManager helixManager) {
HelixManager helixManager, QueryQuotaManager queryQuotaManager) {
_brokerResourcePackages = brokerConf.getProperty(CommonConstants.Broker.BROKER_RESOURCE_PACKAGES,
CommonConstants.Broker.DEFAULT_BROKER_RESOURCE_PACKAGES);
String[] pkgs = _brokerResourcePackages.split(",");
Expand Down Expand Up @@ -112,6 +113,7 @@ protected void configure() {
}
bind(brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID)).named(BROKER_INSTANCE_ID);
bind(serverRoutingStatsManager).to(ServerRoutingStatsManager.class);
bind(queryQuotaManager).to(QueryQuotaManager.class);
bind(accessFactory).to(AccessControlFactory.class);
bind(startTime).named(BrokerAdminApiApplication.START_TIME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected HelixManager _participantHelixManager;
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -288,9 +289,8 @@ public void start()
// Adding cluster name to the config so that it can be used by the AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
_accessControlFactory = AccessControlFactory.loadFactory(factoryConf, _propertyStore);
HelixExternalViewBasedQueryQuotaManager queryQuotaManager =
new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
queryQuotaManager.init(_spectatorHelixManager);
_queryQuotaManager = new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
_queryQuotaManager.init(_spectatorHelixManager);
// Initialize QueryRewriterFactory
LOGGER.info("Initializing QueryRewriterFactory");
QueryRewriterFactory.init(_brokerConf.getProperty(Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
Expand All @@ -311,9 +311,8 @@ public void start()
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache);
singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
Expand All @@ -324,7 +323,7 @@ public void start()
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
if (_brokerConf.getProperty(Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
Expand All @@ -333,7 +332,7 @@ public void start()
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache);
_queryQuotaManager, tableCache);
}
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, multiStageBrokerRequestHandler);
Expand Down Expand Up @@ -364,7 +363,7 @@ public void start()
for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) {
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
_clusterConfigChangeHandlers.add(queryQuotaManager);
_clusterConfigChangeHandlers.add(_queryQuotaManager);
for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
Expand All @@ -373,12 +372,12 @@ public void start()
externalViewChangeHandler.init(_spectatorHelixManager);
}
_externalViewChangeHandlers.add(_routingManager);
_externalViewChangeHandlers.add(queryQuotaManager);
_externalViewChangeHandlers.add(_queryQuotaManager);
for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
instanceConfigChangeHandler.init(_spectatorHelixManager);
}
_instanceConfigChangeHandlers.add(_routingManager);
_instanceConfigChangeHandlers.add(queryQuotaManager);
_instanceConfigChangeHandlers.add(_queryQuotaManager);
for (ClusterChangeHandler liveInstanceChangeHandler : _liveInstanceChangeHandlers) {
liveInstanceChangeHandler.init(_spectatorHelixManager);
}
Expand Down Expand Up @@ -407,11 +406,11 @@ public void start()
_participantHelixManager.getStateMachineEngine()
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(),
new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor, _routingManager,
queryQuotaManager));
_queryQuotaManager));
// Register user-define message handler factory
_participantHelixManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new BrokerUserDefinedMessageHandlerFactory(_routingManager, queryQuotaManager));
new BrokerUserDefinedMessageHandlerFactory(_routingManager, _queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigAndBrokerResourceIfNeeded();
_brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
Expand Down Expand Up @@ -619,7 +618,8 @@ public BrokerRequestHandler getBrokerRequestHandler() {
protected BrokerAdminApiApplication createBrokerAdminApp() {
BrokerAdminApiApplication brokerAdminApiApplication =
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, _brokerMetrics, _brokerConf,
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _spectatorHelixManager);
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _spectatorHelixManager,
_queryQuotaManager);
registerExtraComponents(brokerAdminApiApplication);
return brokerAdminApiApplication;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,20 @@ public boolean acquireDatabase(String databaseName) {
return tryAcquireToken(databaseName, queryQuota);
}

@Override
public double getTableQueryQuota(String tableNameWithType) {
return getQueryQuota(_rateLimiterMap.get(tableNameWithType));
}

@Override
public double getDatabaseQueryQuota(String databaseName) {
return getQueryQuota(_databaseRateLimiterMap.get(databaseName));
}

private double getQueryQuota(QueryQuotaEntity quotaEntity) {
return quotaEntity == null || quotaEntity.getRateLimiter() == null ? 0 : quotaEntity.getRateLimiter().getRate();
}

/**
* {@inheritDoc}
* <p>Acquires a token from rate limiter based on the table name.
Expand Down Expand Up @@ -512,11 +526,6 @@ public Map<String, QueryQuotaEntity> getDatabaseRateLimiterMap() {
return _databaseRateLimiterMap;
}

@VisibleForTesting
public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) {
return _rateLimiterMap.get(tableNameWithType);
}

@VisibleForTesting
public void cleanUpRateLimiterMap() {
_rateLimiterMap.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,18 @@ public interface QueryQuotaManager {
* @return {@code true} if the database quota has not been reached, {@code false} otherwise
*/
boolean acquireDatabase(String databaseName);

/**
* Get the QPS quota in effect for the table
* @param tableNameWithType table name with type
* @return effective quota qps. 0 if no qps quota is set.
*/
double getTableQueryQuota(String tableNameWithType);

/**
* Get the QPS quota in effect for the database
* @param databaseName table name with type
* @return effective quota qps. 0 if no qps quota is set.
*/
double getDatabaseQueryQuota(String databaseName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ public void testOfflineTableWithNullQuotaAndNoRealtimeTableConfig() {
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
Assert.assertNull(queryQuotaEntity.getRateLimiter());
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(OFFLINE_TABLE_NAME), 0);
}

@Test
Expand All @@ -306,8 +305,7 @@ public void testOfflineTableWithNullQuotaButWithRealtimeTableConfigNullQpsConfig
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
Assert.assertNull(queryQuotaEntity.getRateLimiter());
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(OFFLINE_TABLE_NAME), 0);

// Nothing happened since it doesn't have qps quota.
_queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
Expand All @@ -328,8 +326,7 @@ public void testOfflineTableWithNullQuotaButWithRealtimeTableConfigNotNullQpsCon
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
Assert.assertNull(queryQuotaEntity.getRateLimiter());
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(OFFLINE_TABLE_NAME), 0);

// Drop the offline table won't have any affect since it is table type specific.
_queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
Expand Down Expand Up @@ -415,8 +412,7 @@ public void testRealtimeTableWithNullQuotaAndNoOfflineTableConfig()
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
Assert.assertNull(queryQuotaEntity.getRateLimiter());
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(REALTIME_TABLE_NAME), 0);
}

@Test
Expand All @@ -433,8 +429,7 @@ public void testRealtimeTableWithNullQuotaButWithOfflineTableConfigNullQpsConfig
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
Assert.assertNull(queryQuotaEntity.getRateLimiter());
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(REALTIME_TABLE_NAME), 0);
}

@Test
Expand All @@ -450,8 +445,7 @@ public void testRealtimeTableWithNullQuotaButWithOfflineTableConfigNotNullQpsCon
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
Assert.assertNull(queryQuotaEntity.getRateLimiter());
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(REALTIME_TABLE_NAME), 0);
}

@Test
Expand All @@ -461,8 +455,7 @@ public void testNoBrokerResource()
setQps(tableConfig);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, null);
Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
Assert.assertNull(queryQuotaEntity);
Assert.assertEquals(_queryQuotaManager.getTableQueryQuota(REALTIME_TABLE_NAME), 0);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static class Cluster {
public static final String GET_VERSION = "GetVersion";
public static final String GET_ZNODE = "GetZnode";
public static final String GET_DATABASE_QUOTA = "GetDatabaseQuota";
public static final String GET_DATABASE_QUERY_QUOTA = "GetDatabaseQueryQuota";
public static final String INGEST_FILE = "IngestFile";
public static final String RECOMMEND_CONFIG = "RecommendConfig";
public static final String RESET_SEGMENT = "ResetSegment";
Expand Down Expand Up @@ -136,6 +137,7 @@ public static class Table {
public static final String GET_TABLE_CONFIG = "GetTableConfig";
public static final String GET_TABLE_CONFIGS = "GetTableConfigs";
public static final String GET_TABLE_LEADER = "GetTableLeader";
public static final String GET_TABLE_QUERY_QUOTA = "GetTableQueryQuota";
public static final String GET_TIME_BOUNDARY = "GetTimeBoundary";
public static final String GET_SCHEDULER_JOB_DETAILS = "GetSchedulerJobDetails";
public static final String PAUSE_CONSUMPTION = "PauseConsumption";
Expand Down

0 comments on commit fc132c3

Please sign in to comment.