Skip to content

Commit

Permalink
Enable query cancellation for MSQE + cancel using client-provided id (#…
Browse files Browse the repository at this point in the history
…14823)

It also adds a sleep function that can only be used in tests
  • Loading branch information
albertobastos authored Feb 11, 2025
1 parent 5d89aa2 commit f1509f8
Show file tree
Hide file tree
Showing 25 changed files with 786 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,38 +380,48 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe
}

@DELETE
@Path("query/{queryId}")
@Path("query/{id}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the "
+ "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as "
@ApiOperation(value = "Cancel a query as identified by the id", notes = "No effect if no query exists for the "
+ "given id on the requested broker. Query may continue to run for a short while after calling cancel as "
+ "it's done in a non-blocking manner. The cancel method can be called multiple times.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 404, message = "Query not found on the requested broker")
})
public String cancelQuery(
@ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId,
@ApiParam(value = "Query id", required = true) @PathParam("id") String id,
@ApiParam(value = "Determines is query id is internal or provided by the client") @QueryParam("client")
@DefaultValue("false") boolean isClient,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled query: " + queryId;
if (isClient && _requestHandler.cancelQueryByClientId(id, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled client query: " + id;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
} else if (_requestHandler.cancelQuery(Long.parseLong(id), timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled query: " + id;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
}
} catch (NumberFormatException e) {
Response.status(Response.Status.BAD_REQUEST).entity(String.format("Invalid internal query id: %s", id));
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage()))
.entity(String.format("Failed to cancel query: %s on the broker due to error: %s", id, e.getMessage()))
.build());
}
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId))
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", id))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
Expand All @@ -51,6 +57,7 @@
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
Expand All @@ -74,6 +81,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final QueryLogger _queryLogger;
@Nullable
protected final String _enableNullHandling;
/**
* Maps broker-generated query id to the query string.
*/
protected final Map<Long, String> _queriesById;
/**
* Maps broker-generated query id to client-provided query id.
*/
protected final Map<Long, String> _clientQueryIds;

public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
Expand All @@ -90,6 +105,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryLogger = new QueryLogger(config);
_enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING);

boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
if (enableQueryCancellation) {
_queriesById = new ConcurrentHashMap<>();
_clientQueryIds = new ConcurrentHashMap<>();
} else {
_queriesById = null;
_clientQueryIds = null;
}
}

@Override
Expand Down Expand Up @@ -179,6 +204,13 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

/**
* Attemps to cancel an ongoing query identified by its broker-generated id.
* @return true if the query was successfully cancelled, false otherwise.
*/
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception;

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
// TODO: Add partial result flag to RequestContext
Expand Down Expand Up @@ -223,4 +255,66 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
statistics.setTraceInfo(response.getTraceInfo());
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return Collections.unmodifiableMap(_queriesById);
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
try {
return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses);
} finally {
onQueryFinish(queryId);
}
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
Optional<Long> requestId = _clientQueryIds.entrySet().stream()
.filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst();
if (requestId.isPresent()) {
return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses);
} else {
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId);
return false;
}
}

protected String extractClientRequestId(SqlNodeAndOptions sqlNodeAndOptions) {
return sqlNodeAndOptions.getOptions() != null
? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null;
}

protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) {
if (isQueryCancellationEnabled()) {
_queriesById.put(requestId, query);
if (StringUtils.isNotBlank(clientRequestId)) {
_clientQueryIds.put(requestId, clientRequestId);
LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId);
} else {
LOGGER.debug("Keep track of running query: {}", requestId);
}
}
}

protected void onQueryFinish(long requestId) {
if (isQueryCancellationEnabled()) {
_queriesById.remove(requestId);
_clientQueryIds.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
}

protected boolean isQueryCancellationEnabled() {
return _queriesById != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -142,9 +141,10 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final boolean _enableQueryLimitOverride;
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
// maps broker-generated query id with the servers that are running the query
protected final Map<Long, QueryServers> _serversById;
// if >= 0, then overrides default limit of 10, otherwise setting is ignored
protected final int _defaultQueryLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
Expand All @@ -162,11 +162,15 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
if (this.isQueryCancellationEnabled()) {
_serversById = new ConcurrentHashMap<>();
} else {
_serversById = null;
}
_defaultQueryLimit = config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT,
Broker.DEFAULT_BROKER_QUERY_LIMIT);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;

_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
Expand Down Expand Up @@ -215,31 +219,33 @@ public void shutDown() {
}
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
QueryServers queryServers = _serversById.get(requestId);
return queryServers != null ? queryServers._servers : Collections.emptySet();
}

@Override
public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
protected void onQueryFinish(long requestId) {
super.onQueryFinish(requestId);
if (isQueryCancellationEnabled()) {
_serversById.remove(requestId);
}
}

@Override
protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
QueryServers queryServers = _serversById.get(queryId);
if (queryServers == null) {
return false;
}

// TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for
// details
String globalQueryId = getGlobalQueryId(requestId);
String globalQueryId = getGlobalQueryId(queryId);
List<Pair<String, String>> serverUrls = new ArrayList<>();
for (ServerInstance serverInstance : queryServers._servers) {
serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null));
Expand Down Expand Up @@ -807,7 +813,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}
BrokerResponseNative brokerResponse;
if (_queriesById != null) {
if (isQueryCancellationEnabled()) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
// potential failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
Expand All @@ -816,14 +822,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
String clientRequestId = extractClientRequestId(sqlNodeAndOptions);
onQueryStart(
requestId, clientRequestId, query, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
brokerResponse.setClientRequestId(clientRequestId);
} finally {
_queriesById.remove(requestId);
onQueryFinish(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
Expand Down Expand Up @@ -922,6 +930,14 @@ static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRouti
return errorMessage;
}

@Override
protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) {
super.onQueryStart(requestId, clientRequestId, query, extras);
if (isQueryCancellationEnabled() && extras.length > 0 && extras[0] instanceof QueryServers) {
_serversById.put(requestId, (QueryServers) extras[0]);
}
}

private static String getRoutingPolicy(TableConfig tableConfig) {
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
if (routingConfig == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,19 @@ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, Strin
boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;

/**
* Cancel a query as identified by the clientQueryId provided externally. This method is non-blocking so the query may
* still run for a while after calling this method. This cancel method can be called multiple times.
* @param clientQueryId the Id assigned to the query by the client
* @param timeoutMs timeout to wait for servers to respond the cancel requests
* @param executor to send cancel requests to servers in parallel
* @param connMgr to provide the http connections
* @param serverResponses to collect cancel responses from all servers if a map is provided
* @return true if there is a running query for the given clientQueryId.
*/
boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,37 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
// running queries with those from singleStaged engine. Both engines share the same request Id generator, so
// the query will have unique ids across the two engines.
return _singleStageBrokerRequestHandler.getRunningQueries();
// Both engines share the same request Id generator, so the query will have unique ids across the two engines.
Map<Long, String> queries = _singleStageBrokerRequestHandler.getRunningQueries();
if (_multiStageBrokerRequestHandler != null) {
queries.putAll(_multiStageBrokerRequestHandler.getRunningQueries());
}
return queries;
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
// TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if
// not found, try on the singleStaged engine.
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery(
queryId, timeoutMs, executor, connMgr, serverResponses)) {
return true;
}
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses)) {
return true;
}
return _singleStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses);
}

private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response)
throws Exception {
if (numRows == null) {
Expand Down
Loading

0 comments on commit f1509f8

Please sign in to comment.