Skip to content

Commit

Permalink
Dynamically configure SemaphoreBackPressureHandler with BackPressureL…
Browse files Browse the repository at this point in the history
…imiter (#1251)
  • Loading branch information
loicrouchon committed Dec 20, 2024
1 parent 6acef01 commit 281baf7
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ public abstract class AbstractContainerOptions<O extends ContainerOptions<O, B>,

private final Duration maxDelayBetweenPolls;

private final Duration standbyLimitPollingInterval;

private final Duration listenerShutdownTimeout;

private final Duration acknowledgementShutdownTimeout;

private final BackPressureMode backPressureMode;

private final BackPressureLimiter backPressureLimiter;

private final ListenerMode listenerMode;

private final MessagingMessageConverter<?> messageConverter;
Expand Down Expand Up @@ -80,10 +84,12 @@ protected AbstractContainerOptions(Builder<?, ?> builder) {
this.autoStartup = builder.autoStartup;
this.pollTimeout = builder.pollTimeout;
this.pollBackOffPolicy = builder.pollBackOffPolicy;
this.standbyLimitPollingInterval = builder.standbyLimitPollingInterval;
this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls;
this.listenerShutdownTimeout = builder.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout;
this.backPressureMode = builder.backPressureMode;
this.backPressureLimiter = builder.backPressureLimiter;
this.listenerMode = builder.listenerMode;
this.messageConverter = builder.messageConverter;
this.acknowledgementMode = builder.acknowledgementMode;
Expand Down Expand Up @@ -122,6 +128,11 @@ public BackOffPolicy getPollBackOffPolicy() {
return this.pollBackOffPolicy;
}

@Override
public Duration getStandbyLimitPollingInterval() {
return this.standbyLimitPollingInterval;
}

@Override
public Duration getMaxDelayBetweenPolls() {
return this.maxDelayBetweenPolls;
Expand Down Expand Up @@ -154,6 +165,11 @@ public BackPressureMode getBackPressureMode() {
return this.backPressureMode;
}

@Override
public BackPressureLimiter getBackPressureLimiter() {
return this.backPressureLimiter;
}

@Override
public ListenerMode getListenerMode() {
return this.listenerMode;
Expand Down Expand Up @@ -206,6 +222,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final BackOffPolicy DEFAULT_POLL_BACK_OFF_POLICY = buildDefaultBackOffPolicy();

private static final Duration DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL = Duration.ofMillis(100);

private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10);

private static final Duration DEFAULT_LISTENER_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20);
Expand All @@ -214,6 +232,8 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO;

private static final BackPressureLimiter DEFAULT_BACKPRESSURE_LIMITER = null;

private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE;

private static final MessagingMessageConverter<?> DEFAULT_MESSAGE_CONVERTER = new SqsMessagingMessageConverter();
Expand All @@ -230,10 +250,14 @@ protected abstract static class Builder<B extends ContainerOptionsBuilder<B, O>,

private BackOffPolicy pollBackOffPolicy = DEFAULT_POLL_BACK_OFF_POLICY;

private Duration standbyLimitPollingInterval = DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL;

private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT;

private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION;

private BackPressureLimiter backPressureLimiter = DEFAULT_BACKPRESSURE_LIMITER;

private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT;

private Duration acknowledgementShutdownTimeout = DEFAULT_ACKNOWLEDGEMENT_SHUTDOWN_TIMEOUT;
Expand Down Expand Up @@ -272,6 +296,7 @@ protected Builder(AbstractContainerOptions<?, ?> options) {
this.listenerShutdownTimeout = options.listenerShutdownTimeout;
this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout;
this.backPressureMode = options.backPressureMode;
this.backPressureLimiter = options.backPressureLimiter;
this.listenerMode = options.listenerMode;
this.messageConverter = options.messageConverter;
this.acknowledgementMode = options.acknowledgementMode;
Expand Down Expand Up @@ -315,6 +340,13 @@ public B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
return self();
}

@Override
public B standbyLimitPollingInterval(Duration standbyLimitPollingInterval) {
Assert.notNull(standbyLimitPollingInterval, "standbyLimitPollingInterval cannot be null");
this.standbyLimitPollingInterval = standbyLimitPollingInterval;
return self();
}

@Override
public B maxDelayBetweenPolls(Duration maxDelayBetweenPolls) {
Assert.notNull(maxDelayBetweenPolls, "semaphoreAcquireTimeout cannot be null");
Expand Down Expand Up @@ -364,6 +396,12 @@ public B backPressureMode(BackPressureMode backPressureMode) {
return self();
}

@Override
public B backPressureLimiter(BackPressureLimiter backPressureLimiter) {
this.backPressureLimiter = backPressureLimiter;
return self();
}

@Override
public B acknowledgementInterval(Duration acknowledgementInterval) {
Assert.notNull(acknowledgementInterval, "acknowledgementInterval cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,13 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
}

protected BackPressureHandler createBackPressureHandler() {
return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll())
.totalPermits(getContainerOptions().getMaxConcurrentMessages())
.acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls())
.throughputConfiguration(getContainerOptions().getBackPressureMode()).build();
O containerOptions = getContainerOptions();
return SemaphoreBackPressureHandler.builder().batchSize(containerOptions.getMaxMessagesPerPoll())
.totalPermits(containerOptions.getMaxConcurrentMessages())
.standbyLimitPollingInterval(containerOptions.getStandbyLimitPollingInterval())
.acquireTimeout(containerOptions.getMaxDelayBetweenPolls())
.throughputConfiguration(containerOptions.getBackPressureMode())
.backPressureLimiter(containerOptions.getBackPressureLimiter()).build();
}

protected TaskExecutor createSourcesTaskExecutor() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.listener;

/**
* The BackPressureLimiter enables a dynamic reduction of the queues consumption capacity depending on external factors.
*/
public interface BackPressureLimiter {

/**
* {@return the limit to be applied to the queue consumption.}
*
* The limit can be used to reduce the queue consumption capabilities of the next polling attempts. The container
* will work toward satisfying the limit by decreasing the maximum number of concurrent messages that can ve
* processed.
*
* The following values will have the following effects:
*
* <ul>
* <li>zero or negative limits will stop consumption from the queue. When such a situation occurs, the queue
* processing is said to be on "standby".</li>
* <li>Values >= 1 and < {@link ContainerOptions#getMaxConcurrentMessages()} will reduce the queue consumption
* capabilities of the next polling attempts.</li>
* <li>Values >= {@link ContainerOptions#getMaxConcurrentMessages()} will not reduce the queue consumption
* capabilities</li>
* </ul>
*
* Note: the adjustment will require a few polling cycles to be in effect.
*/
int limit();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ public interface ContainerOptions<O extends ContainerOptions<O, B>, B extends Co
boolean isAutoStartup();

/**
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* {@return the amount of time to wait before checking again for the current limit when the queue processing is on
* standby} Default is 100 milliseconds.
*
* @see BackPressureLimiter#limit()
*/
Duration getStandbyLimitPollingInterval();

/**
* Sets the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.
* Default is 10 seconds.
*
Expand Down Expand Up @@ -127,6 +135,12 @@ default BackOffPolicy getPollBackOffPolicy() {
*/
BackPressureMode getBackPressureMode();

/**
* Return the {@link BackPressureLimiter} for this container.
* @return the backpressure limiter.
*/
BackPressureLimiter getBackPressureLimiter();

/**
* Return the {@link ListenerMode} mode for this container.
* @return the listener mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ public interface ContainerOptionsBuilder<B extends ContainerOptionsBuilder<B, O>
*/
B autoStartup(boolean autoStartup);

/**
* Sets the amount of time to wait before checking again for the current limit when the queue processing is on
* standby.
*
* @param standbyLimitPollingInterval the limit polling interval when the queue processing is on standby.
* @return this instance.
* @see BackPressureLimiter#limit()
*/
B standbyLimitPollingInterval(Duration standbyLimitPollingInterval);

/**
* Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to
* acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available.
Expand Down Expand Up @@ -145,6 +155,14 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) {
*/
B backPressureMode(BackPressureMode backPressureMode);

/**
* Set the {@link BackPressureLimiter} for this container. Default is {@code null}.
*
* @param backPressureLimiter the backpressure limiter.
* @return this instance.
*/
B backPressureLimiter(BackPressureLimiter backPressureLimiter);

/**
* Set the maximum interval between acknowledgements for batch acknowledgements. The default depends on the specific
* {@link ContainerComponentFactory} implementation.
Expand Down
Loading

0 comments on commit 281baf7

Please sign in to comment.