-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable query cancellation for MSQE + cancel using client-provided id #14823
Conversation
} | ||
String clientQueryId = extractClientQueryId(sqlNodeAndOptions); | ||
if (StringUtils.isBlank(clientQueryId)) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) in general we don't recommend returning NULL as a coding practice
Is ClientQueryID a new concept? Is it same as requestID ? How does the support added here improve the existing Query Cancellation (which is also exposed to user IIRC) ? |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14823 +/- ##
============================================
+ Coverage 61.75% 63.62% +1.87%
- Complexity 207 1482 +1275
============================================
Files 2436 2727 +291
Lines 133233 152607 +19374
Branches 20636 23582 +2946
============================================
+ Hits 82274 97099 +14825
- Misses 44911 48186 +3275
- Partials 6048 7322 +1274
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Hi Siddharth, AFAIK, the current cancellation feature depends on the internal requestId generated by the broker itself. That request id is not returned until the query completes, so an external user requires first to ask for the active running queries, determine from the responded array the requestId assigned to the one he's interested in (just comparing the query body) and finally use the cancel operation to abort it. That's two back-and-forth trips between the user and the cluster. With a client-provided requestId he can skip one step, going straight to the cancel operation using his own ID to abort the query. |
As some extra context, the endgame of this is enable on UI a "Cancel" button the customer can use to abort an ongoing query. Using a query id provided by the customer or the UI itself, that can be done without need of any internal id retrieval. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding several comments but I wasn't able to read the whole PR.
Although I'm asking for changes, it is a good PR overall. We just need to finish the last mile.
} catch (InterruptedException e) { | ||
//TODO: handle interruption | ||
//Thread.currentThread().interrupt(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to fix this TODO before merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestion on how we should deal with an interruption here? Just warn it and skip the sleep or propagate the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given you cannot throw interruption exception here, you have to mark the interruption flag again and probably throw another exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes total sense f8d23cd
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
Outdated
Show resolved
Hide resolved
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not something we introduced in this PR, but something I think we need to take care of in the future:
We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.
This is something we need to improve in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)
@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |||
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | |||
throws Exception; | |||
|
|||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the queryId
received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId
translation is done by BaseBrokerRequestHandler.cancelQueryByClientId
.
Added some minimal javadoc here: 52998d3
I tried to mimic the current code design for handleRequest
, but it is a bit confusing the existance of two handleRequest
methods here:
- A public method implemented by the interface and called from the endpoint layer that receives a
SqlNodeAndOptions
parameter. - A protected method called from the previous one and already receiving a
requestId
and thequery
's string itself.
To increase confusion, neither of the two methods have a javadoc.
This probably could get better designed if we move forward with the proposed shared state design.
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
Show resolved
Hide resolved
...rc/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
Outdated
Show resolved
Hide resolved
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java
Show resolved
Hide resolved
pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java
Outdated
Show resolved
Hide resolved
...tion-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @gortiz
Besides my doubts on how to handle the sleep interruption (is it really necessary now that we only enable it during tests?) and some future tasks and refactors derived from the PR, I believe I follow your advice on all your suggestions.
} catch (InterruptedException e) { | ||
//TODO: handle interruption | ||
//Thread.currentThread().interrupt(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any suggestion on how we should deal with an interruption here? Just warn it and skip the sleep or propagate the error?
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
Outdated
Show resolved
Hide resolved
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the shared state refactor, we should write it down somewhere so we actually do it ;-)
@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |||
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | |||
throws Exception; | |||
|
|||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the queryId
received here always refers to a broker-generated internal id. The clientQueryId -> brokerQueryId
translation is done by BaseBrokerRequestHandler.cancelQueryByClientId
.
Added some minimal javadoc here: 52998d3
I tried to mimic the current code design for handleRequest
, but it is a bit confusing the existance of two handleRequest
methods here:
- A public method implemented by the interface and called from the endpoint layer that receives a
SqlNodeAndOptions
parameter. - A protected method called from the previous one and already receiving a
requestId
and thequery
's string itself.
To increase confusion, neither of the two methods have a javadoc.
This probably could get better designed if we move forward with the proposed shared state design.
clientQueryId
query option that can be used when using theclientQuery/{clientQueryId}
endpoint.sleep(ms)
function, as for today only recommended for testing purposes.Some refactor involved to reuse as much as possible cancellation logic between SSQE and MSQE.