Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework MSE query throttling to take into account estimated number of threads used by a query #14847

Merged
merged 2 commits into from
Jan 24, 2025

Conversation

yashmayya
Copy link
Collaborator

  • Add cluster configuration to allow limiting the number of multi-stage queries running concurrently #14574 introduced a mechanism to throttle multi-stage engine queries at the broker level using a new beta cluster config. The mechanism treated all queries as equivalent and assumed that queries were evenly distributed across brokers.
  • This patch updates the mechanism to take into account the estimated number of threads that would be spawned in the servers for a query instead. The beta cluster config is also changed to reflect this. This is a better model since users no longer need to tweak the config based on their exact query workload and can instead use an estimated value based on the cluster size and instance sizes. Furthermore, mixed workloads will be better supported with larger queries potentially being blocked for longer while smaller queries are executed (query starvation and timeout is a possibility with this primitive model though).
  • Note that prior to this change, there was actually an issue with the way the throttling threshold was determined - each broker calculated the threshold as maxConcurrentQueries * numServers / numBrokers. Since the max concurrent queries is a "per server" config, the calculation incorrectly makes the assumption that queries are executed on a single server (instead of assuming that they're dispatched to all servers which is a better assumption).
  • With the change here to throttle based on the estimated number of threads, the throttling threshold becomes maxServerQueryThreads * numServers / numBrokers which only makes the assumption that queries are evenly distributed across brokers (and no assumptions about the fanout to servers). This feature isn't intended to completely prevent large queries from executing so the cluster config should be set to a value that is at least large enough to accommodate queries that can spawn up to maxServerQueryThreads * numServers / numBrokers number of threads.
  • An alternate implementation could be to track the estimated number of threads on a per server basis (using the worker <-> server mapping in the query plan) but this has a lot more edge cases and also much higher overhead on large clusters with a large number of servers (since we'd need to acquire permits for many servers for every single query).

@yashmayya yashmayya added enhancement Configuration Config changes (addition/deletion/change in behavior) multi-stage Related to the multi-stage query engine labels Jan 20, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jan 20, 2025

Codecov Report

Attention: Patch coverage is 47.50000% with 21 lines in your changes missing coverage. Please review.

Project coverage is 63.73%. Comparing base (59551e4) to head (53e22af).
Report is 1617 commits behind head on master.

Files with missing lines Patch % Lines
...ot/query/planner/physical/DispatchableSubPlan.java 0.00% 15 Missing ⚠️
...requesthandler/MultiStageBrokerRequestHandler.java 0.00% 3 Missing ⚠️
...roker/requesthandler/MultiStageQueryThrottler.java 85.71% 0 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14847      +/-   ##
============================================
+ Coverage     61.75%   63.73%   +1.98%     
- Complexity      207     1469    +1262     
============================================
  Files          2436     2708     +272     
  Lines        133233   151441   +18208     
  Branches      20636    23380    +2744     
============================================
+ Hits          82274    96518   +14244     
- Misses        44911    47673    +2762     
- Partials       6048     7250    +1202     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.69% <47.50%> (+1.98%) ⬆️
java-21 63.63% <47.50%> (+2.00%) ⬆️
skip-bytebuffers-false 63.73% <47.50%> (+1.98%) ⬆️
skip-bytebuffers-true 63.59% <47.50%> (+35.86%) ⬆️
temurin 63.73% <47.50%> (+1.98%) ⬆️
unittests 63.72% <47.50%> (+1.98%) ⬆️
unittests1 56.31% <0.00%> (+9.42%) ⬆️
unittests2 34.02% <47.50%> (+6.28%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yashmayya yashmayya force-pushed the msqe-query-threads-throttling branch from f0b0c1f to deaca57 Compare January 21, 2025 10:12
@yashmayya yashmayya marked this pull request as ready for review January 23, 2025 12:13
Comment on lines 48 to 51
if (permits < _totalPermits.get()) {
reducePermits(_totalPermits.get() - permits);
} else if (permits > _totalPermits.get()) {
release(permits - _totalPermits.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. If this method can be called concurrently, it doesn't seem we are protecting ourself agains races. If it cannot be called concurrently, it seems odd to use an atomic integer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we can't actually have concurrent modifications. Only potential concurrent reads during modifications. I think we can simply use a volatile int instead (I guess even the volatile is not strictly necessary as we can tolerate stale values without many issues tbh).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I thought. Volatile should be enough to be correct. In practice, you could even remove the volatile as you suggest, but in theory, that could imply that the reader threads never read the new value. Also, IIRC, in x86, to read a volatile attribute that was not modified is exactly as to read a nonvolatile attribute. The only actual issue is if that attribute ends up creating false sharing with another, frequently written, not volatile attribute

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, thanks for that info!

The only actual issue is if that attribute ends up creating false sharing with another, frequently written, not volatile attribute

Wouldn't the other attribute also need to be volatile?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. The issue is when a frequently written volatile is in the same cache as a non-volatile frequently read attribute. Each writes into the volatile will evict the cache line, so other threads reading the non-volatile will need to read the line again (https://medium.com/@ali.gelenler/cache-trashing-and-false-sharing-ce044d131fc0)

@yashmayya yashmayya force-pushed the msqe-query-threads-throttling branch from deaca57 to 53e22af Compare January 23, 2025 13:21
@yashmayya yashmayya merged commit c239952 into apache:master Jan 24, 2025
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) enhancement multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants