Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically configure SemaphoreBackPressureHandler with BackPressureLimiter (#1251) #1308

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

loicrouchon
Copy link

📢 Type of change

  • Bugfix
  • New feature
  • Enhancement
  • Refactoring

📜 Description

This change enhances the SemaphoreBackPressureHandler with the support of a new BackPressureLimiter interface.

This BackPressureLimiter interface is to be implemented by applications. It has a single method int limit() that returns a number of permits that can be consumed by the SemaphoreBackPressureHandler.

Before each polling, the limit will be checked by the SemaphoreBackPressureHandler and adjust the number of permits that can be requested (in the range [0, totalPermits]). The limit returned by the BackPressureLimiter#limit() method is to be understood in terms of number of messages that can be consumed from the queue at the current instant. If 0 (or less) the queue consumption is considered to be on standby.

When a polling is performed and the consumption is on standby, the SemaphoreBackPressureHandler will sleep the standbyLimitPollingInterval before allowing for a next polling attempt (we cannot rely on the semaphore acquire timeouts here, hence the need for standbyLimitPollingInterval.

Both the BackPressureLimiter and standbyLimitPollingInterval can be configured via the ContainerOptions

💡 Motivation and Context

The goal of this change is to address #1251.

#1251 aims to provide a more general solution to issues like #481 by giving control to users on how they would like to dynamically limit messages consumption from an SQS queue. Typical use cases could be rate-limiters (like #481) or more complicated setups involving measuring the load of a downstream system and adjusting or stopping the messages consumption

💚 How did you test it?

The testing was so far only tested via integration tests which tested various scenarios:

  • static limit being 0, totalPermits or more, and various values in between.
  • Dynamic limit changing, and checking the queue processing adjusting itself (in a "synchronized" way)
  • Dynamic limit changing, and checking the queue processing adjusting itself (not in a synchronized way, so more realistic of what will happen, but harder to make strict assumptions on the moving target)

📝 Checklist

  • I reviewed submitted code
  • I added tests to verify changes
  • I updated reference documentation to reflect the change
  • All tests passing
  • No breaking changes

🔮 Next steps

  • Collecting feedback on the approach
  • Work on updating the reference documentation to mention the new capability.

@github-actions github-actions bot added the component: sqs SQS integration related issue label Dec 20, 2024
@jeroenvandevelde
Copy link

@loicrouchon If i read this correctly and i would need to build #481 with for example process 5 messages every second.

I would need to implement the BackPressureLimiter, with a logic to set/increase the limit to/with 5 for every second?
As every message received would substract one of the BackPressureLimiter.

@loicrouchon
Copy link
Author

@jeroenvandevelde, the goal of the BackPressureLimiter is to provide an information on the maximum number of messages that can be read from the queue in the next polling cycle. This is done by calling the BackPressureLimiter#limit() before trying to acquire permits to poll messages from the queue.

However, it does not consider how many messages are currently being processed by the queue consumer, nor does it check again the current BackPressureLimiter#limit() after obtaining permits if this number should be reduced.

So what you would need to do to implement a rate-limiter is a 2 step approach.

  1. rate-limit implementation: In the queue consumer, you would need to protect sending messages to the rate-limited API using to effectively wait before sending an API call if you're over the limit. This will ensure the hard constraint on the rate limit is being applied.
  2. Making the queue rate-limit friendly: Configure the SQS container with a BackPressureLimiter#limit() that reads the current API rate and translates it into a limit. This will allow the queue consumption to avoid polling messages which would have to wait and therefore potentially exceed the the visibility timeout.

In case the rate-limit is not a hard limit (i.e. meaning you can temporarily have short bursts over the limit), then 1. is not necessary.

So rate-limiting can somehow be performed, but it requires extra care if there are hard constraints regarding the rate limit. The solution implemented in this PR primarily aims at use-cases where the pressure is measured from an external system and in which case there is by design a delay between the measure of the pressure on the downstream system and reacting to it.

A good example would be when pushing messages to another system (could be another queue for example). In this case, the BackPressureLimiter#limit() could be built in such a way it would measure the pressure on the system and map the pressure level to a certain limit.

For example, in the case the downstream system is an SQS queue:

  • If the number of visible messages < 10, then limit = total permits (full speed)
  • If the number of visible messages < 100, then limit = 50% of total permits (reduced speed)
  • If the number of visible messages < 100, then limit = 0 (standby)

PS: the advantage of the example pacing messages publication to a downstream queue might not be obvious, but there are two:

  • The regular resources consumption one: Avoids consuming cluster resources (CPU, memory) and other systems' resources involved into the process of publishing that message so that critical systems have more bandwidth to catch-up.
  • Allow different publishers to publish messages with different priorities. One publisher might be low priority and should stop publishing messages if there is a high load, while another might have a higher priority and should not stop yet

@jeroenvandevelde
Copy link

jeroenvandevelde commented Dec 21, 2024

@loicrouchon I missed the fact than that if it reads a message it doesn't subtract it from the limit.

In my case where i would like to have a max rate of x messages / second, your approach seems a bit weird as all the information for this is available in the library.

How fast are we processing and the rate at which we would like to go (configurable value).
So therefore i would prefer to not push the complexity of keeping track of how many messages we are retrieving and change the limit based on that to the user of the library.

I can follow if it is based on parameters outside of the library, which your case sounds like.
I will try to find some time in the coming days to look if i can find an alternative way to do this.

@loicrouchon
Copy link
Author

How fast are we processing and the rate at which we would like to go (configurable value).
So therefore i would prefer to not push the complexity of keeping track of how many messages we are retrieving and change the limit based on that to the user of the library.

@jeroenvandevelde I think this information is only fully available in a very limited number of use cases. Most of the queue consumer implementations I had to implement in the past were doing a variable number of API calls. It usually depended on the content of the message being consumed, some persistent state, or the result of previous API calls.

So limiting consumption from the queue at a fix rate-per-second is not accounting for cases where the rate-limited API we're trying to protect would be called a variable number of times (0 times or more). That is why in the solution presented here the BackPressureLimiter#limit() performs a measure of the pressure and then translates into how many messages can be consumed at that point in time. It does not do a computation of many it allowed/how many it can still allow.

I totally agree with you, it is more complicated than the use case you described. But it would avoid the issue of blocking the queue consumption for messages that did not triggered the API call that should be rate-limited.

I will try to find some time in the coming days to look if i can find an alternative way to do this.

Please do so, and let me know about your findings.

@jeroenvandevelde
Copy link

jeroenvandevelde commented Dec 21, 2024

I think this information is only fully available in a very limited number of use cases. Most of the queue consumer implementations I had to implement in the past were doing a variable number of API calls. It usually depended on the content of the message being consumed, some persistent state, or the result of previous API calls.

If these situations are very limited or are a normal use case will depend a lot on your context.
Maybe both situations require separate solutions with their own amount of complexity.
I totally understand your solution gives room for more fine-grained rate limiting as you can increase/decrease based on the input from other systems/...

We have designed our system, so that every HTTP call to an (external) endpoint has a queue in front to handle the overflow.
We do a kind of intake with both internal and external apis whereafter we get an x amount of requests / seconds, especially in external apis it is quite common that you pay/get x requests / seconds.

This indeed doesn't cover the situation where system a is not going at full speed and therefore system b could go faster against the same endpoint.

@tomazfernandes
Copy link
Contributor

Hi @loicrouchon and @jeroenvandevelde.

@loicrouchon, overall this looks very promising - thanks!

I wonder why you chose to change the current SemaphoreBackPressureHandler implementation though - it seems like the functionalities are complementary to each other? Would it make sense to have this logic in a separate class, maybe a wrapper as you suggested?

Also, would it make sense maybe to have a list of BackPressureHandlers and have each return a number of permits they'll allow for that poll or something similar?

It also seems like the changes might fix #1187, which is an outstanding issue, so that's great.

Sorry I couldn't discuss this earlier with you in the issue, and thanks again to you both for bringing these enhancements up!

@loicrouchon
Copy link
Author

Hi @tomazfernandes and Happy New Year!

I wonder why you chose to change the current SemaphoreBackPressureHandler implementation though - it seems like the functionalities are complementary to each other? Would it make sense to have this logic in a separate class, maybe a wrapper as you suggested?

I thought about doing so, but gave up because of the release method that was somehow complicated to wrap. But that was at the beginning of my attempts. I now have a much better understanding of the SemaphoreBackPressureHandler and I can try to revisit that aspect as I agree with you it would be better to have a composable approach to keep complexity minimal in each component.

I'll keep you posted about my progress.

@loicrouchon
Copy link
Author

@tomazfernandes I pushed a version using a wrapper over the SemaphoreBackPressureHandler. I'm quite happy with how it simplify things now and I'm looking for feedback before continuing with the PR (maybe renaming a few things for improving clarity and updating reference documentation)

Also, would it make sense maybe to have a list of BackPressureHandlers and have each return a number of permits they'll allow for that poll or something similar?

I'm not sure, I think it can get tricky very quickly when it comes to the requestBatch/releaseBatch/getBatchSize methods.

However, if you would like to limit the number of permits via different BackPressureLimiter, nothing prevents from creating another BackPressureLimiter which implementation calls the other ones and returns the min value. Would that do the trick you were mentioning?

It also seems like the changes might fix #1187, which is an outstanding issue, so that's great.

Regarding this, I'm not 100% sure it would fix it. So I would need to look more into it.

@tomazfernandes
Copy link
Contributor

Thanks for the update @loicrouchon, I think we're definitely moving in the right direction here.

Also, would it make sense maybe to have a list of BackPressureHandlers and have each return a number of permits they'll allow for that poll or something similar?

I'm not sure, I think it can get tricky very quickly when it comes to the requestBatch/releaseBatch/getBatchSize methods.

I'll share what's in my mind for your consideration - I think it's similar to what you have but with a slightly different approach that might simplify things a bit while bringing more flexibility. I might be missing something though.

I think each BackPressureHandler implementation accounts for a specific capacity: SemaphoreBPH accounts for the app's capacity, RateLimitingBPH could account for a downstream API capacity, and we could have another BPH implementation for checking a downstream queue capacity.

So, we could have a CompositeBackPressureHandler implementation of BatchAwareBPH that would contain a list of BPHs that it would iterate on per poll.

The batch methods themselves from BatchAwareBPH are really just a way to request and release a predictable amount of permits each time, and the CompositeBPH could have the batch size information.

On a requestBatch or request calls, CBPH would delegate to the inner BPHs, either calling requestBatch directly or request(batchSize) if the instance is not BatchAware. We'd keep track of the amount of permits each BPH returns and pick the smaller number.

We'd then call release on the difference for each BPH and finally return the chosen number of permits.

The benefit I see for this approach is that we can keep each BPH implementation with its own independent logic and reacting to request and release calls independently, while also not introducing a new interface which would also add complexity.

We could also in the future separate Low / High Throughput logic to its own BPH to better manage complexity, and reduce SemaphoreBPH which TBH would make me very happy.

Example Scenario

I'll illustrate with an example just as a sanity check:

Let's say we have 3 BPH implementations in the CompositeBPH:

  • SemaphoreBPH has 50 permits representing the apps maximum capacity
  • RateLimiterBPH has 20 permits / second representing a downstream API capacity
  • DownstreamQueueBPH has a variable number of permits depending on a downstream queue
  • Our batch size will be 20 for this example.

On the first poll, SBPH will return 20 permits, RLBPH will return 20 permits, DSQBPH let's say will return 20 permits. CBPH will then return 20 permits.

On the second poll, SBPH will return 20 permits, RLBPH will return 0 permits and DQBPH will return let's say 20 permits again. CBPH will call release with 20 permits on SBPH, with 0 permits on RLBPH and with 20 on DQBPH, and return 0 permits. We could of course short-circuit when the number of returned permits is 0 for a SBPH.

Let's say at some point the downstream API is holding requests with a 10 second poorly configured timeout. The RateLimiterBPH would keep allowing 20 permits / second, but the SemaphoreBPH would cap consumption at 50 concurrent messages, avoiding having 200 simultaneous requests which might break both the app and the downstream API.

As the consumers release permits, CompositeBPH would delegate release calls accordingly to all implementations, some of which would likely be a no-ops such as in the RateLimiterBPH.

I don't think this is too different from what you are proposing, in that we're effectively limiting the amount of permits returned, and the logic for CompositeBPH might even be similar to the one you have in the BackPressureHandlerLimiter, but I think it simplifies things a bit by using only one interface (BPH), and adds more flexibility for the user to just add any number of BPH implementations that operate independently from each other.

Of course, there's a lot of complexity involved and I might be missing something.

Please let me know your thoughts.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants