-
-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308
base: main
Are you sure you want to change the base?
Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308
Conversation
@loicrouchon If i read this correctly and i would need to build #481 with for example process 5 messages every second. I would need to implement the BackPressureLimiter, with a logic to set/increase the limit to/with 5 for every second? |
@jeroenvandevelde, the goal of the However, it does not consider how many messages are currently being processed by the queue consumer, nor does it check again the current So what you would need to do to implement a rate-limiter is a 2 step approach.
In case the rate-limit is not a hard limit (i.e. meaning you can temporarily have short bursts over the limit), then 1. is not necessary. So rate-limiting can somehow be performed, but it requires extra care if there are hard constraints regarding the rate limit. The solution implemented in this PR primarily aims at use-cases where the pressure is measured from an external system and in which case there is by design a delay between the measure of the pressure on the downstream system and reacting to it. A good example would be when pushing messages to another system (could be another queue for example). In this case, the For example, in the case the downstream system is an SQS queue:
PS: the advantage of the example pacing messages publication to a downstream queue might not be obvious, but there are two:
|
@loicrouchon I missed the fact than that if it reads a message it doesn't subtract it from the limit. In my case where i would like to have a max rate of x messages / second, your approach seems a bit weird as all the information for this is available in the library. How fast are we processing and the rate at which we would like to go (configurable value). I can follow if it is based on parameters outside of the library, which your case sounds like. |
@jeroenvandevelde I think this information is only fully available in a very limited number of use cases. Most of the queue consumer implementations I had to implement in the past were doing a variable number of API calls. It usually depended on the content of the message being consumed, some persistent state, or the result of previous API calls. So limiting consumption from the queue at a fix rate-per-second is not accounting for cases where the rate-limited API we're trying to protect would be called a variable number of times (0 times or more). That is why in the solution presented here the I totally agree with you, it is more complicated than the use case you described. But it would avoid the issue of blocking the queue consumption for messages that did not triggered the API call that should be rate-limited.
Please do so, and let me know about your findings. |
If these situations are very limited or are a normal use case will depend a lot on your context. We have designed our system, so that every HTTP call to an (external) endpoint has a queue in front to handle the overflow. This indeed doesn't cover the situation where system a is not going at full speed and therefore system b could go faster against the same endpoint. |
Hi @loicrouchon and @jeroenvandevelde. @loicrouchon, overall this looks very promising - thanks! I wonder why you chose to change the current Also, would it make sense maybe to have a list of It also seems like the changes might fix #1187, which is an outstanding issue, so that's great. Sorry I couldn't discuss this earlier with you in the issue, and thanks again to you both for bringing these enhancements up! |
Hi @tomazfernandes and Happy New Year!
I thought about doing so, but gave up because of the release method that was somehow complicated to wrap. But that was at the beginning of my attempts. I now have a much better understanding of the I'll keep you posted about my progress. |
…BackPressureHandler (awspring#1251)
@tomazfernandes I pushed a version using a wrapper over the SemaphoreBackPressureHandler. I'm quite happy with how it simplify things now and I'm looking for feedback before continuing with the PR (maybe renaming a few things for improving clarity and updating reference documentation)
I'm not sure, I think it can get tricky very quickly when it comes to the However, if you would like to limit the number of permits via different
Regarding this, I'm not 100% sure it would fix it. So I would need to look more into it. |
Thanks for the update @loicrouchon, I think we're definitely moving in the right direction here.
I'll share what's in my mind for your consideration - I think it's similar to what you have but with a slightly different approach that might simplify things a bit while bringing more flexibility. I might be missing something though. I think each So, we could have a The batch methods themselves from On a We'd then call The benefit I see for this approach is that we can keep each We could also in the future separate Low / High Throughput logic to its own Example ScenarioI'll illustrate with an example just as a sanity check: Let's say we have 3 BPH implementations in the
On the first poll, On the second poll, Let's say at some point the downstream API is holding requests with a 10 second poorly configured timeout. The As the consumers release permits, I don't think this is too different from what you are proposing, in that we're effectively limiting the amount of permits returned, and the logic for Of course, there's a lot of complexity involved and I might be missing something. Please let me know your thoughts. Thanks. |
📢 Type of change
📜 Description
This change enhances the
SemaphoreBackPressureHandler
with the support of a newBackPressureLimiter
interface.This
BackPressureLimiter
interface is to be implemented by applications. It has a single methodint limit()
that returns a number of permits that can be consumed by theSemaphoreBackPressureHandler
.Before each polling, the limit will be checked by the
SemaphoreBackPressureHandler
and adjust the number of permits that can be requested (in the range[0, totalPermits]
). The limit returned by theBackPressureLimiter#limit()
method is to be understood in terms of number of messages that can be consumed from the queue at the current instant. If0
(or less) the queue consumption is considered to be on standby.When a polling is performed and the consumption is on standby, the
SemaphoreBackPressureHandler
will sleep thestandbyLimitPollingInterval
before allowing for a next polling attempt (we cannot rely on the semaphore acquire timeouts here, hence the need forstandbyLimitPollingInterval
.Both the
BackPressureLimiter
andstandbyLimitPollingInterval
can be configured via theContainerOptions
💡 Motivation and Context
The goal of this change is to address #1251.
#1251 aims to provide a more general solution to issues like #481 by giving control to users on how they would like to dynamically limit messages consumption from an SQS queue. Typical use cases could be rate-limiters (like #481) or more complicated setups involving measuring the load of a downstream system and adjusting or stopping the messages consumption
💚 How did you test it?
The testing was so far only tested via integration tests which tested various scenarios:
📝 Checklist
🔮 Next steps