diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 44da5f962d32..cec17c240eec 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -380,38 +380,48 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe } @DELETE - @Path("query/{queryId}") + @Path("query/{id}") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the " - + "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as " + @ApiOperation(value = "Cancel a query as identified by the id", notes = "No effect if no query exists for the " + + "given id on the requested broker. Query may continue to run for a short while after calling cancel as " + "it's done in a non-blocking manner. The cancel method can be called multiple times.") @ApiResponses(value = { @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 404, message = "Query not found on the requested broker") }) public String cancelQuery( - @ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId, + @ApiParam(value = "Query id", required = true) @PathParam("id") String id, + @ApiParam(value = "Determines is query id is internal or provided by the client") @QueryParam("client") + @DefaultValue("false") boolean isClient, @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") @DefaultValue("3000") int timeoutMs, @ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") boolean verbose) { try { Map serverResponses = verbose ? new HashMap<>() : null; - if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) { - String resp = "Cancelled query: " + queryId; + if (isClient && _requestHandler.cancelQueryByClientId(id, timeoutMs, _executor, _httpConnMgr, serverResponses)) { + String resp = "Cancelled client query: " + id; if (verbose) { resp += " with responses from servers: " + serverResponses; } return resp; + } else if (_requestHandler.cancelQuery(Long.parseLong(id), timeoutMs, _executor, _httpConnMgr, serverResponses)) { + String resp = "Cancelled query: " + id; + if (verbose) { + resp += " with responses from servers: " + serverResponses; + } + return resp; } + } catch (NumberFormatException e) { + Response.status(Response.Status.BAD_REQUEST).entity(String.format("Invalid internal query id: %s", id)); } catch (Exception e) { throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage())) + .entity(String.format("Failed to cancel query: %s on the broker due to error: %s", id, e.getMessage())) .build()); } throw new WebApplicationException( - Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId)) + Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", id)) .build()); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 9a5e0e94a487..11c63c699510 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -19,11 +19,16 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.ws.rs.WebApplicationException; @@ -31,6 +36,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.pinot.broker.api.AccessControl; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.broker.broker.AccessControlFactory; @@ -51,6 +57,7 @@ import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; @@ -74,6 +81,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final QueryLogger _queryLogger; @Nullable protected final String _enableNullHandling; + /** + * Maps broker-generated query id to the query string. + */ + protected final Map _queriesById; + /** + * Maps broker-generated query id to client-provided query id. + */ + protected final Map _clientQueryIds; public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) { @@ -90,6 +105,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryLogger = new QueryLogger(config); _enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING); + + boolean enableQueryCancellation = + Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); + if (enableQueryCancellation) { + _queriesById = new ConcurrentHashMap<>(); + _clientQueryIds = new ConcurrentHashMap<>(); + } else { + _queriesById = null; + _clientQueryIds = null; + } } @Override @@ -179,6 +204,13 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception; + /** + * Attemps to cancel an ongoing query identified by its broker-generated id. + * @return true if the query was successfully cancelled, false otherwise. + */ + protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) throws Exception; + protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { statistics.setNumRowsResultSet(response.getNumRowsResultSet()); // TODO: Add partial result flag to RequestContext @@ -223,4 +255,66 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments()); statistics.setTraceInfo(response.getTraceInfo()); } + + @Override + public Map getRunningQueries() { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + return Collections.unmodifiableMap(_queriesById); + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + try { + return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses); + } finally { + onQueryFinish(queryId); + } + } + + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + Optional requestId = _clientQueryIds.entrySet().stream() + .filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst(); + if (requestId.isPresent()) { + return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses); + } else { + LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); + return false; + } + } + + protected String extractClientRequestId(SqlNodeAndOptions sqlNodeAndOptions) { + return sqlNodeAndOptions.getOptions() != null + ? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null; + } + + protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) { + if (isQueryCancellationEnabled()) { + _queriesById.put(requestId, query); + if (StringUtils.isNotBlank(clientRequestId)) { + _clientQueryIds.put(requestId, clientRequestId); + LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); + } else { + LOGGER.debug("Keep track of running query: {}", requestId); + } + } + } + + protected void onQueryFinish(long requestId) { + if (isQueryCancellationEnabled()) { + _queriesById.remove(requestId); + _clientQueryIds.remove(requestId); + LOGGER.debug("Remove track of running query: {}", requestId); + } + } + + protected boolean isQueryCancellationEnabled() { + return _queriesById != null; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index ee36d4196151..bd0bdca945ee 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -38,7 +38,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.ws.rs.WebApplicationException; @@ -142,9 +141,10 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final boolean _enableQueryLimitOverride; protected final boolean _enableDistinctCountBitmapOverride; protected final int _queryResponseLimit; + // maps broker-generated query id with the servers that are running the query + protected final Map _serversById; // if >= 0, then overrides default limit of 10, otherwise setting is ignored protected final int _defaultQueryLimit; - protected final Map _queriesById; protected final boolean _enableMultistageMigrationMetric; protected ExecutorService _multistageCompileExecutor; protected BlockingQueue> _multistageCompileQueryQueue; @@ -162,11 +162,15 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro _config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false); _queryResponseLimit = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); + if (this.isQueryCancellationEnabled()) { + _serversById = new ConcurrentHashMap<>(); + } else { + _serversById = null; + } _defaultQueryLimit = config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT, Broker.DEFAULT_BROKER_QUERY_LIMIT); boolean enableQueryCancellation = Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); - _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null; _enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC); @@ -215,31 +219,33 @@ public void shutDown() { } } - @Override - public Map getRunningQueries() { - Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); - return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); - } - @VisibleForTesting Set getRunningServers(long requestId) { - Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); - QueryServers queryServers = _queriesById.get(requestId); + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + QueryServers queryServers = _serversById.get(requestId); return queryServers != null ? queryServers._servers : Collections.emptySet(); } @Override - public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + protected void onQueryFinish(long requestId) { + super.onQueryFinish(requestId); + if (isQueryCancellationEnabled()) { + _serversById.remove(requestId); + } + } + + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { - Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); - QueryServers queryServers = _queriesById.get(requestId); + QueryServers queryServers = _serversById.get(queryId); if (queryServers == null) { return false; } + // TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for // details - String globalQueryId = getGlobalQueryId(requestId); + String globalQueryId = getGlobalQueryId(queryId); List> serverUrls = new ArrayList<>(); for (ServerInstance serverInstance : queryServers._servers) { serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null)); @@ -807,7 +813,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } BrokerResponseNative brokerResponse; - if (_queriesById != null) { + if (isQueryCancellationEnabled()) { // Start to track the running query for cancellation just before sending it out to servers to avoid any // potential failures that could happen before sending it out, like failures to calculate the routing table etc. // TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and @@ -816,14 +822,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // can always list the running queries and cancel query again until it ends. Just that such race // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to // servers takes time, but will address later if needed. - _queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); - LOGGER.debug("Keep track of running query: {}", requestId); + String clientRequestId = extractClientRequestId(sqlNodeAndOptions); + onQueryStart( + requestId, clientRequestId, query, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); try { brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); + brokerResponse.setClientRequestId(clientRequestId); } finally { - _queriesById.remove(requestId); + onQueryFinish(requestId); LOGGER.debug("Remove track of running query: {}", requestId); } } else { @@ -922,6 +930,14 @@ static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRouti return errorMessage; } + @Override + protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) { + super.onQueryStart(requestId, clientRequestId, query, extras); + if (isQueryCancellationEnabled() && extras.length > 0 && extras[0] instanceof QueryServers) { + _serversById.put(requestId, (QueryServers) extras[0]); + } + } + private static String getRoutingPolicy(TableConfig tableConfig) { RoutingConfig routingConfig = tableConfig.getRoutingConfig(); if (routingConfig == null) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 277f5f96df63..710cc68b182a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -83,4 +83,19 @@ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, Strin boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception; + + /** + * Cancel a query as identified by the clientQueryId provided externally. This method is non-blocking so the query may + * still run for a while after calling this method. This cancel method can be called multiple times. + * @param clientQueryId the Id assigned to the query by the client + * @param timeoutMs timeout to wait for servers to respond the cancel requests + * @param executor to send cancel requests to servers in parallel + * @param connMgr to provide the http connections + * @param serverResponses to collect cancel responses from all servers if a map is provided + * @return true if there is a running query for the given clientQueryId. + */ + boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index 561e79abb4fe..03965b6fef69 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -133,21 +133,37 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String @Override public Map getRunningQueries() { - // TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its - // running queries with those from singleStaged engine. Both engines share the same request Id generator, so - // the query will have unique ids across the two engines. - return _singleStageBrokerRequestHandler.getRunningQueries(); + // Both engines share the same request Id generator, so the query will have unique ids across the two engines. + Map queries = _singleStageBrokerRequestHandler.getRunningQueries(); + if (_multiStageBrokerRequestHandler != null) { + queries.putAll(_multiStageBrokerRequestHandler.getRunningQueries()); + } + return queries; } @Override public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { - // TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if - // not found, try on the singleStaged engine. + if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery( + queryId, timeoutMs, executor, connMgr, serverResponses)) { + return true; + } return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses); } + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQueryByClientId( + clientQueryId, timeoutMs, executor, connMgr, serverResponses)) { + return true; + } + return _singleStageBrokerRequestHandler.cancelQueryByClientId( + clientQueryId, timeoutMs, executor, connMgr, serverResponses); + } + private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response) throws Exception { if (numRows == null) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 642ea7039766..26b76f969042 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -113,10 +113,12 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED, CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config, CommonConstants.Broker.BROKER_TLS_PREFIX) : null; - _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config, tlsConfig), tlsConfig); + _queryDispatcher = new QueryDispatcher( + new MailboxService(hostname, port, config, tlsConfig), tlsConfig, this.isQueryCancellationEnabled()); LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, " - + "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs, - _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit()); + + "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port, + _brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), + this.isQueryCancellationEnabled()); _explainAskingServerDefault = _config.getProperty( CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN, CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN); @@ -294,6 +296,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } + String clientRequestId = extractClientRequestId(sqlNodeAndOptions); + onQueryStart(requestId, clientRequestId, query); + try { Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId), ThreadExecutionContext.TaskType.MSE); @@ -326,12 +331,14 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO QueryException.getException(queryException, consolidatedMessage)); } finally { Tracing.getThreadAccountant().clear(); + onQueryFinish(requestId); } long executionEndTimeNs = System.nanoTime(); updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs); BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.setClientRequestId(clientRequestId); brokerResponse.setResultTable(queryResults.getResultTable()); brokerResponse.setTablesQueried(tableNames); brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs()); @@ -512,16 +519,9 @@ private BrokerResponse constructMultistageExplainPlan(String sql, String plan) { } @Override - public Map getRunningQueries() { - // TODO: Support running query tracking for multi-stage engine - throw new UnsupportedOperationException(); - } - - @Override - public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) { - // TODO: Support query cancellation for multi-stage engine - throw new UnsupportedOperationException(); + return _queryDispatcher.cancel(queryId); } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java index ac6962c5929e..f1d3564c2ec0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -82,6 +82,13 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S throw new IllegalArgumentException("Not supported yet"); } + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception { + throw new IllegalArgumentException("Not supported yet"); + } + @Override public void start() { LOGGER.info("Starting time-series request handler"); @@ -141,6 +148,14 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC return false; } + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + // TODO: Implement this. + return false; + } + private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString) throws URISyntaxException { List pairs = URLEncodedUtils.parse( diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java index 60b6991733c8..c1445e98bde7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java @@ -216,4 +216,12 @@ public static RelDataType getRelDataType(RelDataTypeFactory typeFactory, Class 0 ? portOverride : Integer.parseInt(broker.getPort()); + HttpDelete deleteMethod = new HttpDelete(String.format( + "%s://%s:%d/clientQuery/%d?verbose=%b", + protocol, broker.getHostName(), port, clientQueryId, verbose)); + Map requestHeaders = createRequestHeaders(httpHeaders); + requestHeaders.forEach(deleteMethod::setHeader); + try (CloseableHttpResponse response = client.execute(deleteMethod)) { + int status = response.getCode(); + String responseContent = EntityUtils.toString(response.getEntity()); + if (status == 200) { + return responseContent; + } + if (status == 404) { + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(String.format("Client query: %s not found on the broker: %s", clientQueryId, brokerId)).build()); + } + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to cancel client query: %s on the broker: %s with unexpected status=%d and resp='%s'", + clientQueryId, brokerId, status, responseContent)).build()); + } + } catch (WebApplicationException e) { + throw e; + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to cancel client query: %s on the broker: %s due to error: %s", clientQueryId, brokerId, + e.getMessage())).build()); + } + } + + @DELETE + @Path("clientQuery/{clientQueryId}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for" + + "the given clientQueryId on any broker. Query may continue to run for a short while after calling" + + "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on any broker") + }) + public String cancelClientQuery( + @ApiParam(value = "ClientQueryId provided by the client", required = true) + @PathParam("clientQueryId") String clientQueryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return verbose responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") + boolean verbose, @Context HttpHeaders httpHeaders) { + try { + Timeout timeout = Timeout.of(timeoutMs, TimeUnit.MILLISECONDS); + RequestConfig defaultRequestConfig = + RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build(); + CloseableHttpClient client = + HttpClients.custom().setConnectionManager(_httpConnMgr).setDefaultRequestConfig(defaultRequestConfig).build(); + + String protocol = _controllerConf.getControllerBrokerProtocol(); + int portOverride = _controllerConf.getControllerBrokerPortOverride(); + + Map requestHeaders = createRequestHeaders(httpHeaders); + List brokerDeletes = new ArrayList<>(); + for (InstanceInfo broker: getBrokers(httpHeaders.getHeaderString(DATABASE)).values()) { + int port = portOverride > 0 ? portOverride : broker.getPort(); + HttpDelete delete = new HttpDelete(String.format( + "%s://%s:%d/query/%s?client=true&verbose=%b", protocol, broker.getHost(), port, clientQueryId, verbose)); + requestHeaders.forEach(delete::setHeader); + brokerDeletes.add(delete); + } + + if (brokerDeletes.isEmpty()) { + throw new WebApplicationException( + Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("No available brokers").build()); + } + + Set statusCodes = new HashSet<>(); + for (HttpDelete delete: brokerDeletes) { + try (CloseableHttpResponse response = client.execute(delete)) { + int status = response.getCode(); + String responseContent = EntityUtils.toString(response.getEntity()); + if (status == 200) { + return responseContent; + } else { + statusCodes.add(status); + } + } + } + + if (statusCodes.size() == 1 && statusCodes.iterator().next() == 404) { + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(String.format("Client query: %s not found on any broker", clientQueryId)).build()); + } + + statusCodes.remove(404); + int status = statusCodes.iterator().next(); + throw new Exception( + String.format("Unexpected status=%d", status)); + } catch (WebApplicationException e) { + throw e; + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to cancel client query: %s due to error: %s", clientQueryId, e.getMessage())).build()); + } + } + @GET @Path("/queries") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY) @@ -173,10 +311,7 @@ public Map> getRunningQueries( @ApiParam(value = "Timeout for brokers to return running queries") @QueryParam("timeoutMs") @DefaultValue("3000") int timeoutMs, @Context HttpHeaders httpHeaders) { try { - Map> tableBrokers = - _pinotHelixResourceManager.getTableToLiveBrokersMapping(httpHeaders.getHeaderString(DATABASE)); - Map brokers = new HashMap<>(); - tableBrokers.values().forEach(list -> list.forEach(info -> brokers.putIfAbsent(getInstanceKey(info), info))); + Map brokers = getBrokers(httpHeaders.getHeaderString(DATABASE)); return getRunningQueries(brokers, timeoutMs, createRequestHeaders(httpHeaders)); } catch (Exception e) { throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) @@ -247,4 +382,12 @@ private static Map createRequestHeaders(HttpHeaders httpHeaders) }); return requestHeaders; } + + private Map getBrokers(String database) { + Map> tableBrokers = + _pinotHelixResourceManager.getTableToLiveBrokersMapping(database); + Map brokers = new HashMap<>(); + tableBrokers.values().forEach(list -> list.forEach(info -> brokers.putIfAbsent(getInstanceKey(info), info))); + return brokers; + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java index 61d62e45318e..7d8d022d20d9 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator; import org.apache.pinot.spi.data.readers.GenericRow; import org.testng.Assert; @@ -40,6 +41,13 @@ private void testFunction(String functionExpression, List expectedArgume Assert.assertEquals(evaluator.evaluate(row), expectedResult); } + private void testFunction(String functionExpression, List expectedArguments, GenericRow row, + Consumer assertResult) { + InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(functionExpression); + Assert.assertEquals(evaluator.getArguments(), expectedArguments); + assertResult.accept(evaluator.evaluate(row)); + } + @Test(dataProvider = "arithmeticFunctionsDataProvider") public void testArithmeticFunctions(String functionExpression, List expectedArguments, GenericRow row, Object expectedResult) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java index 190142e65d3f..0f9c4d8761f6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Consumer; import org.apache.pinot.common.function.scalar.DateTimeFunctions; import org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator; import org.apache.pinot.spi.data.readers.GenericRow; @@ -36,6 +37,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.assertTrue; /** @@ -54,6 +56,13 @@ private void testFunction(String functionExpression, List expectedArgume assertEquals(evaluator.evaluate(row), expectedResult); } + private void testFunction(String functionExpression, List expectedArguments, GenericRow row, + Consumer assertResult) { + InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(functionExpression); + assertEquals(evaluator.getArguments(), expectedArguments); + assertResult.accept(evaluator.evaluate(row)); + } + private void testDateFunction(String functionExpression, List expectedArguments, GenericRow row, Object expectedResult) { InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(functionExpression); @@ -778,4 +787,14 @@ public void testDateTimeConvertMultipleInvocations() { testMultipleInvocations(String.format("dateTimeConvert(timeCol, '%s', '%s', '%s')", inputFormatStr, outputFormatStr, outputGranularityStr), rows, expectedResults); } + + @Test + public void testSleepFunction() { + long startTime = System.currentTimeMillis(); + testFunction("sleep(50)", Collections.emptyList(), new GenericRow(), result -> { + assertTrue((long) result >= 50); + }); + long endTime = System.currentTimeMillis(); + assertTrue(endTime - startTime >= 50); + } } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 299d60c75496..a54fcd2ba0bf 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -873,6 +873,10 @@ public static String getBrokerQueryApiUrl(String brokerBaseApiUrl, boolean useMu return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" : brokerBaseApiUrl + "/query/sql"; } + public static String getBrokerQueryCancelUrl(String brokerBaseApiUrl, String brokerId, String clientQueryId) { + return brokerBaseApiUrl + "/clientQuery/" + brokerId + "/" + clientQueryId; + } + private static int getH2ExpectedValues(Set expectedValues, List expectedOrderByValues, ResultSet h2ResultSet, ResultSetMetaData h2MetaData, Collection orderByColumns) throws SQLException { diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index d2b4db8a1eca..85b1383aaade 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -565,6 +565,13 @@ public JsonNode postQueryToController(String query) return postQueryToController(query, getControllerBaseApiUrl(), null, getExtraQueryPropertiesForController()); } + public JsonNode cancelQuery(String clientQueryId) + throws Exception { + URI cancelURI = URI.create(getControllerRequestURLBuilder().forCancelQueryByClientId(clientQueryId)); + Object o = _httpClient.sendDeleteRequest(cancelURI); + return null; // TODO + } + private Map getExtraQueryPropertiesForController() { if (!useMultiStageQueryEngine()) { return Map.of(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java new file mode 100644 index 000000000000..913ad2984be3 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +/** + * Integration test that checks the query cancellation feature. + */ +public class CancelQueryIntegrationTests extends BaseClusterIntegrationTestSet { + private static final int NUM_BROKERS = 1; + private static final int NUM_SERVERS = 4; + + private final List _serviceStatusCallbacks = + new ArrayList<>(getNumBrokers() + getNumServers()); + + protected int getNumBrokers() { + return NUM_BROKERS; + } + + protected int getNumServers() { + return NUM_SERVERS; + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, "true"); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + super.overrideServerConf(serverConf); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION, "true"); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + // Set hyperloglog log2m value to 12. + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(12)); + startBrokers(getNumBrokers()); + startServers(getNumServers()); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name + // and validate correctness with parallel push protection enabled. + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + // Create a copy of _tarDir to create multiple segments with the same name. + File tarDir2 = new File(_tempDir, "tarDir2"); + FileUtils.copyDirectory(_tarDir, tarDir2); + + List tarDirs = new ArrayList<>(); + tarDirs.add(_tarDir); + tarDirs.add(tarDir2); + try { + uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); + } catch (Exception e) { + // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently + // In such cases we upload all the segments again to ensure that the data is set up correctly. + assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() + .contains("Failed to create ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); + uploadSegments(getTableName(), _tarDir); + } + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + // Set up service status callbacks + // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the + // resources to monitor + registerCallbackHandlers(); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + private void registerCallbackHandlers() { + List instances = _helixAdmin.getInstancesInCluster(getHelixClusterName()); + instances.removeIf( + instanceId -> !InstanceTypeUtils.isBroker(instanceId) && !InstanceTypeUtils.isServer(instanceId)); + List resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName()); + resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource) + && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource))); + for (String instance : instances) { + List resourcesToMonitor = new ArrayList<>(); + for (String resourceName : resourcesInCluster) { + IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName); + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getInstanceSet(partitionName).contains(instance)) { + resourcesToMonitor.add(resourceName); + break; + } + } + } + _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of( + new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0), + new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0)))); + } + } + + @Test + public void testInstancesStarted() { + assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers()); + for (ServiceStatus.ServiceStatusCallback serviceStatusCallback : _serviceStatusCallbacks) { + assertEquals(serviceStatusCallback.getServiceStatus(), ServiceStatus.Status.GOOD); + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testCancelByClientQueryId(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String clientRequestId = UUID.randomUUID().toString(); + // tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time + String sqlQuery = + "SET clientQueryId='" + clientRequestId + "'; " + + "SELECT sleep(ActualElapsedTime+60000) FROM mytable WHERE ActualElapsedTime > 0 limit 1"; + + new Timer().schedule(new java.util.TimerTask() { + @Override + public void run() { + try { + sendCancel(clientRequestId); + } catch (Exception e) { + fail("No exception should be thrown", e); + } + } + }, 500); + + JsonNode result = postQuery(sqlQuery); + // ugly: error message differs from SSQE to MSQE + assertQueryCancellation(result, useMultiStageQueryEngine ? "InterruptedException" : "QueryCancellationError"); + } + + private void sendCancel(String clientRequestId) + throws Exception { + cancelQuery(clientRequestId); + } + + private void assertQueryCancellation(JsonNode result, String errorText) { + assertNotNull(result); + JsonNode exceptions = result.get("exceptions"); + assertNotNull(exceptions); + assertTrue(exceptions.isArray()); + assertFalse(exceptions.isEmpty()); + for (JsonNode exception: exceptions) { + JsonNode message = exception.get("message"); + if (message != null && message.asText().contains(errorText)) { + return; + } + } + fail("At least one QueryCancellationError expected."); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index a981a930ce41..24349eb78756 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -66,6 +66,7 @@ protected int getNumReplicas() { @Override protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); brokerConf.setProperty(FailureDetector.CONFIG_OF_TYPE, FailureDetector.Type.CONNECTION.name()); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 32b7a2463cb6..fb1afe08ab1c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; @@ -79,6 +80,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.JsonUtils; @@ -179,6 +181,18 @@ protected List getFieldConfigs() { CompressionCodec.MV_ENTRY_DICT, null)); } + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, "true"); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + super.overrideServerConf(serverConf); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION, "true"); + } + @BeforeClass public void setUp() throws Exception { @@ -3868,4 +3882,18 @@ public void testFastFilteredCountWithOrFilterOnBitmapWithExclusiveBitmap(boolean "SELECT COUNT(*) FROM mytable WHERE Origin BETWEEN 'ALB' AND 'LMT' OR DayofMonth <> 2" ); } + + @Test(dataProvider = "useBothQueryEngines") + public void testResponseWithClientRequestId(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String clientRequestId = UUID.randomUUID().toString(); + String sqlQuery = + "SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " + + "SELECT AirlineID FROM mytable LIMIT 1"; + JsonNode result = postQuery(sqlQuery); + assertNoError(result); + + assertEquals(result.get("clientRequestId").asText(), clientRequestId); + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 4477cccedc13..e3c8d07ef4a1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -107,18 +107,25 @@ public class QueryDispatcher { private final Map _timeSeriesDispatchClientMap = new ConcurrentHashMap<>(); @Nullable private final TlsConfig _tlsConfig; + // maps broker-generated query id to the set of servers that the query was dispatched to + private final Map> _serversByQuery; private final PhysicalTimeSeriesBrokerPlanVisitor _timeSeriesBrokerPlanVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); public QueryDispatcher(MailboxService mailboxService) { - this(mailboxService, null); + this(mailboxService, null, false); } - public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig tlsConfig) { + public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig tlsConfig, boolean enableCancellation) { _mailboxService = mailboxService; _executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), new TracedThreadFactory(Thread.NORM_PRIORITY, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT)); _tlsConfig = tlsConfig; + if (enableCancellation) { + _serversByQuery = new ConcurrentHashMap<>(); + } else { + _serversByQuery = null; + } } public void start() { @@ -129,13 +136,13 @@ public QueryResult submitAndReduce(RequestContext context, DispatchableSubPlan d Map queryOptions) throws Exception { long requestId = context.getRequestId(); - List plans = dispatchableSubPlan.getQueryStageList(); + Set servers = new HashSet<>(); try { - submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions); + submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions); return runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, _mailboxService); } catch (Throwable e) { // TODO: Consider always cancel when it returns (early terminate) - cancel(requestId, plans); + cancel(requestId, servers); throw e; } } @@ -147,11 +154,13 @@ public List explain(RequestContext context, DispatchablePlanFragment f List planNodes = new ArrayList<>(); List plans = Collections.singletonList(fragment); + Set servers = new HashSet<>(); try { SendRequest> requestSender = DispatchClient::explain; - execute(requestId, plans, timeoutMs, queryOptions, requestSender, (responses, serverInstance) -> { + execute(requestId, plans, timeoutMs, queryOptions, requestSender, servers, (responses, serverInstance) -> { for (Worker.ExplainResponse response : responses) { if (response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR)) { + cancel(requestId, servers); throw new RuntimeException( String.format("Unable to explain query plan for request: %d on server: %s, ERROR: %s", requestId, serverInstance, @@ -164,6 +173,7 @@ public List explain(RequestContext context, DispatchablePlanFragment f Plan.PlanNode planNode = Plan.PlanNode.parseFrom(rootNode); planNodes.add(PlanNodeDeserializer.process(planNode)); } catch (InvalidProtocolBufferException e) { + cancel(requestId, servers); throw new RuntimeException("Failed to parse explain plan node for request " + requestId + " from server " + serverInstance, e); } @@ -172,20 +182,24 @@ public List explain(RequestContext context, DispatchablePlanFragment f }); } catch (Throwable e) { // TODO: Consider always cancel when it returns (early terminate) - cancel(requestId, plans); + cancel(requestId, servers); throw e; } return planNodes; } @VisibleForTesting - void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions) + void submit( + long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Set serversOut, + Map queryOptions) throws Exception { SendRequest requestSender = DispatchClient::submit; List stagePlans = dispatchableSubPlan.getQueryStageList(); List plansWithoutRoot = stagePlans.subList(1, stagePlans.size()); - execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, requestSender, (response, serverInstance) -> { + execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, requestSender, serversOut, + (response, serverInstance) -> { if (response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR)) { + cancel(requestId, serversOut); throw new RuntimeException( String.format("Unable to execute query plan for request: %d on server: %s, ERROR: %s", requestId, serverInstance, @@ -193,19 +207,26 @@ void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeou "null"))); } }); + if (isQueryCancellationEnabled()) { + _serversByQuery.put(requestId, serversOut); + } + } + + private boolean isQueryCancellationEnabled() { + return _serversByQuery != null; } - private void execute(long requestId, List stagePlans, long timeoutMs, - Map queryOptions, SendRequest sendRequest, BiConsumer resultConsumer) + private void execute(long requestId, List stagePlans, + long timeoutMs, Map queryOptions, + SendRequest sendRequest, Set serverInstancesOut, + BiConsumer resultConsumer) throws ExecutionException, InterruptedException, TimeoutException { Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS); - Set serverInstances = new HashSet<>(); + List stageInfos = serializePlanFragments(stagePlans, serverInstancesOut, deadline); - List stageInfos = serializePlanFragments(stagePlans, serverInstances, deadline); - - if (serverInstances.isEmpty()) { + if (serverInstancesOut.isEmpty()) { throw new RuntimeException("No server instances to dispatch query to"); } @@ -213,10 +234,10 @@ private void execute(long requestId, List stagePla ByteString protoRequestMetadata = QueryPlanSerDeUtils.toProtoProperties(requestMetadata); // Submit the query plan to all servers in parallel - int numServers = serverInstances.size(); + int numServers = serverInstancesOut.size(); BlockingQueue> dispatchCallbacks = new ArrayBlockingQueue<>(numServers); - for (QueryServerInstance serverInstance : serverInstances) { + for (QueryServerInstance serverInstance : serverInstancesOut) { Consumer> callbackConsumer = response -> { if (!dispatchCallbacks.offer(response)) { LOGGER.warn("Failed to offer response to dispatchCallbacks queue for query: {} on server: {}", requestId, @@ -361,20 +382,29 @@ private StageInfo(ByteString rootNode, ByteString customProperty) { } } - private void cancel(long requestId, List stagePlans) { - int numStages = stagePlans.size(); - // Skip the reduce stage (stage 0) - Set serversToCancel = new HashSet<>(); - for (int stageId = 1; stageId < numStages; stageId++) { - serversToCancel.addAll(stagePlans.get(stageId).getServerInstanceToWorkerIdMap().keySet()); + public boolean cancel(long requestId) { + if (isQueryCancellationEnabled()) { + return cancel(requestId, _serversByQuery.remove(requestId)); + } else { + return false; } - for (QueryServerInstance queryServerInstance : serversToCancel) { + } + + private boolean cancel(long requestId, @Nullable Set servers) { + if (servers == null) { + return false; + } + for (QueryServerInstance queryServerInstance : servers) { try { getOrCreateDispatchClient(queryServerInstance).cancel(requestId); } catch (Throwable t) { LOGGER.warn("Caught exception while cancelling query: {} on server: {}", requestId, queryServerInstance, t); } } + if (isQueryCancellationEnabled()) { + _serversByQuery.remove(requestId); + } + return true; } private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServerInstance) { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index c21c40b2d9db..6f64791ec69f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -85,7 +87,8 @@ public void tearDown() { public void testQueryDispatcherCanSendCorrectPayload(String sql) throws Exception { DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, new HashSet<>(), Collections.emptyMap()); } @Test @@ -95,7 +98,8 @@ public void testQueryDispatcherThrowsWhenQueryServerThrows() { Mockito.doThrow(new RuntimeException("foo")).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, new HashSet<>(), Collections.emptyMap()); Assert.fail("Method call above should have failed"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Error dispatching query")); @@ -111,7 +115,7 @@ public void testQueryDispatcherCancelWhenQueryServerCallsOnError() Mockito.doAnswer(invocationOnMock -> { StreamObserver observer = invocationOnMock.getArgument(1); observer.onError(new RuntimeException("foo")); - return null; + return Set.of(); }).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); long requestId = REQUEST_ID_GEN.getAndIncrement(); RequestContext context = new DefaultRequestContext(); @@ -166,7 +170,8 @@ public void testQueryDispatcherThrowsWhenQueryServerCallsOnError() { }).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, new HashSet<>(), Collections.emptyMap()); Assert.fail("Method call above should have failed"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Error dispatching query")); @@ -187,7 +192,8 @@ public void testQueryDispatcherThrowsWhenQueryServerTimesOut() { }).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 200L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 200L, new HashSet<>(), Collections.emptyMap()); Assert.fail("Method call above should have failed"); } catch (Exception e) { String message = e.getMessage(); @@ -203,6 +209,7 @@ public void testQueryDispatcherThrowsWhenDeadlinePreExpiredAndAsyncResponseNotPo throws Exception { String sql = "SELECT * FROM a WHERE col1 = 'foo'"; DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 0L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 0L, new HashSet<>(), Collections.emptyMap()); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index bdf1cbed0f55..4e8e7c82e929 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -566,6 +566,9 @@ public static class QueryOptionKey { public static final String GET_CURSOR = "getCursor"; // Number of rows that the cursor should contain public static final String CURSOR_NUM_ROWS = "cursorNumRows"; + + // Custom Query ID provided by the client + public static final String CLIENT_QUERY_ID = "clientQueryId"; } public static class QueryOptionValue { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 25415c7b5671..9c0ede26839f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -614,4 +614,8 @@ private static String encode(String s) { public String forSegmentUpload() { return StringUtil.join("/", _baseUrl, "v2/segments"); } + + public String forCancelQueryByClientId(String clientRequestId) { + return StringUtil.join("/", _baseUrl, "clientQuery", clientRequestId); + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java index 447544aa8723..bd8175f26fa7 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java @@ -294,6 +294,8 @@ public String[] getDefaultBatchTableDirectories() { protected Map getConfigOverrides() { Map configOverrides = new HashMap<>(); configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); + configOverrides.put(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, true); + configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION, true); return configOverrides; }