diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java index 81f4eb3f2..2662b0ade 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractContainerOptions.java @@ -47,12 +47,16 @@ public abstract class AbstractContainerOptions, private final Duration maxDelayBetweenPolls; + private final Duration standbyLimitPollingInterval; + private final Duration listenerShutdownTimeout; private final Duration acknowledgementShutdownTimeout; private final BackPressureMode backPressureMode; + private final BackPressureLimiter backPressureLimiter; + private final ListenerMode listenerMode; private final MessagingMessageConverter messageConverter; @@ -80,10 +84,12 @@ protected AbstractContainerOptions(Builder builder) { this.autoStartup = builder.autoStartup; this.pollTimeout = builder.pollTimeout; this.pollBackOffPolicy = builder.pollBackOffPolicy; + this.standbyLimitPollingInterval = builder.standbyLimitPollingInterval; this.maxDelayBetweenPolls = builder.maxDelayBetweenPolls; this.listenerShutdownTimeout = builder.listenerShutdownTimeout; this.acknowledgementShutdownTimeout = builder.acknowledgementShutdownTimeout; this.backPressureMode = builder.backPressureMode; + this.backPressureLimiter = builder.backPressureLimiter; this.listenerMode = builder.listenerMode; this.messageConverter = builder.messageConverter; this.acknowledgementMode = builder.acknowledgementMode; @@ -122,6 +128,11 @@ public BackOffPolicy getPollBackOffPolicy() { return this.pollBackOffPolicy; } + @Override + public Duration getStandbyLimitPollingInterval() { + return this.standbyLimitPollingInterval; + } + @Override public Duration getMaxDelayBetweenPolls() { return this.maxDelayBetweenPolls; @@ -154,6 +165,11 @@ public BackPressureMode getBackPressureMode() { return this.backPressureMode; } + @Override + public BackPressureLimiter getBackPressureLimiter() { + return this.backPressureLimiter; + } + @Override public ListenerMode getListenerMode() { return this.listenerMode; @@ -206,6 +222,8 @@ protected abstract static class Builder, private static final BackOffPolicy DEFAULT_POLL_BACK_OFF_POLICY = buildDefaultBackOffPolicy(); + private static final Duration DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL = Duration.ofMillis(100); + private static final Duration DEFAULT_SEMAPHORE_TIMEOUT = Duration.ofSeconds(10); private static final Duration DEFAULT_LISTENER_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20); @@ -214,6 +232,8 @@ protected abstract static class Builder, private static final BackPressureMode DEFAULT_THROUGHPUT_CONFIGURATION = BackPressureMode.AUTO; + private static final BackPressureLimiter DEFAULT_BACKPRESSURE_LIMITER = null; + private static final ListenerMode DEFAULT_MESSAGE_DELIVERY_STRATEGY = ListenerMode.SINGLE_MESSAGE; private static final MessagingMessageConverter DEFAULT_MESSAGE_CONVERTER = new SqsMessagingMessageConverter(); @@ -230,10 +250,14 @@ protected abstract static class Builder, private BackOffPolicy pollBackOffPolicy = DEFAULT_POLL_BACK_OFF_POLICY; + private Duration standbyLimitPollingInterval = DEFAULT_STANDBY_LIMIT_POLLING_INTERVAL; + private Duration maxDelayBetweenPolls = DEFAULT_SEMAPHORE_TIMEOUT; private BackPressureMode backPressureMode = DEFAULT_THROUGHPUT_CONFIGURATION; + private BackPressureLimiter backPressureLimiter = DEFAULT_BACKPRESSURE_LIMITER; + private Duration listenerShutdownTimeout = DEFAULT_LISTENER_SHUTDOWN_TIMEOUT; private Duration acknowledgementShutdownTimeout = DEFAULT_ACKNOWLEDGEMENT_SHUTDOWN_TIMEOUT; @@ -272,6 +296,7 @@ protected Builder(AbstractContainerOptions options) { this.listenerShutdownTimeout = options.listenerShutdownTimeout; this.acknowledgementShutdownTimeout = options.acknowledgementShutdownTimeout; this.backPressureMode = options.backPressureMode; + this.backPressureLimiter = options.backPressureLimiter; this.listenerMode = options.listenerMode; this.messageConverter = options.messageConverter; this.acknowledgementMode = options.acknowledgementMode; @@ -315,6 +340,13 @@ public B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) { return self(); } + @Override + public B standbyLimitPollingInterval(Duration standbyLimitPollingInterval) { + Assert.notNull(standbyLimitPollingInterval, "standbyLimitPollingInterval cannot be null"); + this.standbyLimitPollingInterval = standbyLimitPollingInterval; + return self(); + } + @Override public B maxDelayBetweenPolls(Duration maxDelayBetweenPolls) { Assert.notNull(maxDelayBetweenPolls, "semaphoreAcquireTimeout cannot be null"); @@ -364,6 +396,12 @@ public B backPressureMode(BackPressureMode backPressureMode) { return self(); } + @Override + public B backPressureLimiter(BackPressureLimiter backPressureLimiter) { + this.backPressureLimiter = backPressureLimiter; + return self(); + } + @Override public B acknowledgementInterval(Duration acknowledgementInterval) { Assert.notNull(acknowledgementInterval, "acknowledgementInterval cannot be null"); diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java index 6808f647a..79073c96c 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/AbstractPipelineMessageListenerContainer.java @@ -225,10 +225,13 @@ private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) { } protected BackPressureHandler createBackPressureHandler() { - return SemaphoreBackPressureHandler.builder().batchSize(getContainerOptions().getMaxMessagesPerPoll()) - .totalPermits(getContainerOptions().getMaxConcurrentMessages()) - .acquireTimeout(getContainerOptions().getMaxDelayBetweenPolls()) - .throughputConfiguration(getContainerOptions().getBackPressureMode()).build(); + O containerOptions = getContainerOptions(); + return SemaphoreBackPressureHandler.builder().batchSize(containerOptions.getMaxMessagesPerPoll()) + .totalPermits(containerOptions.getMaxConcurrentMessages()) + .standbyLimitPollingInterval(containerOptions.getStandbyLimitPollingInterval()) + .acquireTimeout(containerOptions.getMaxDelayBetweenPolls()) + .throughputConfiguration(containerOptions.getBackPressureMode()) + .backPressureLimiter(containerOptions.getBackPressureLimiter()).build(); } protected TaskExecutor createSourcesTaskExecutor() { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureLimiter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureLimiter.java new file mode 100644 index 000000000..f85ddba82 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/BackPressureLimiter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.listener; + +/** + * The BackPressureLimiter enables a dynamic reduction of the queues consumption capacity depending on external factors. + */ +public interface BackPressureLimiter { + + /** + * {@return the limit to be applied to the queue consumption.} + * + * The limit can be used to reduce the queue consumption capabilities of the next polling attempts. The container + * will work toward satisfying the limit by decreasing the maximum number of concurrent messages that can ve + * processed. + * + * The following values will have the following effects: + * + *
    + *
  • zero or negative limits will stop consumption from the queue. When such a situation occurs, the queue + * processing is said to be on "standby".
  • + *
  • Values >= 1 and < {@link ContainerOptions#getMaxConcurrentMessages()} will reduce the queue consumption + * capabilities of the next polling attempts.
  • + *
  • Values >= {@link ContainerOptions#getMaxConcurrentMessages()} will not reduce the queue consumption + * capabilities
  • + *
+ * + * Note: the adjustment will require a few polling cycles to be in effect. + */ + int limit(); +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java index ad7313cf6..e78f967a6 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptions.java @@ -59,7 +59,15 @@ public interface ContainerOptions, B extends Co boolean isAutoStartup(); /** - * Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to + * {@return the amount of time to wait before checking again for the current limit when the queue processing is on + * standby} Default is 100 milliseconds. + * + * @see BackPressureLimiter#limit() + */ + Duration getStandbyLimitPollingInterval(); + + /** + * Sets the maximum time the polling thread should wait for a full batch of permits to be available before trying to * acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available. * Default is 10 seconds. * @@ -127,6 +135,12 @@ default BackOffPolicy getPollBackOffPolicy() { */ BackPressureMode getBackPressureMode(); + /** + * Return the {@link BackPressureLimiter} for this container. + * @return the backpressure limiter. + */ + BackPressureLimiter getBackPressureLimiter(); + /** * Return the {@link ListenerMode} mode for this container. * @return the listener mode. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java index 9d03b7964..de88c0464 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/ContainerOptionsBuilder.java @@ -56,6 +56,16 @@ public interface ContainerOptionsBuilder */ B autoStartup(boolean autoStartup); + /** + * Sets the amount of time to wait before checking again for the current limit when the queue processing is on + * standby. + * + * @param standbyLimitPollingInterval the limit polling interval when the queue processing is on standby. + * @return this instance. + * @see BackPressureLimiter#limit() + */ + B standbyLimitPollingInterval(Duration standbyLimitPollingInterval); + /** * Set the maximum time the polling thread should wait for a full batch of permits to be available before trying to * acquire a partial batch if so configured. A poll is only actually executed if at least one permit is available. @@ -145,6 +155,14 @@ default B pollBackOffPolicy(BackOffPolicy pollBackOffPolicy) { */ B backPressureMode(BackPressureMode backPressureMode); + /** + * Set the {@link BackPressureLimiter} for this container. Default is {@code null}. + * + * @param backPressureLimiter the backpressure limiter. + * @return this instance. + */ + B backPressureLimiter(BackPressureLimiter backPressureLimiter); + /** * Set the maximum interval between acknowledgements for batch acknowledgements. The default depends on the specific * {@link ContainerComponentFactory} implementation. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java index 310b64519..e3d069bce 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/SemaphoreBackPressureHandler.java @@ -17,9 +17,11 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -35,33 +37,63 @@ public class SemaphoreBackPressureHandler implements BatchAwareBackPressureHandl private static final Logger logger = LoggerFactory.getLogger(SemaphoreBackPressureHandler.class); - private final Semaphore semaphore; + private final BackPressureLimiter backPressureLimiter; + + private final ReducibleSemaphore semaphore; private final int batchSize; + /** + * The theoretical maximum numbers of permits that can be acquired if no limit is set. + * @see #permitsLimit for the current limit. + */ private final int totalPermits; + /** + * The limit of permits that can be acquired at the current time. The permits limit is defined in the [0, + * totalPermits] interval. A value of {@literal 0} means that no permits can be acquired. + *

+ * This value is updated based on the downstream backpressure reported by the {@link #backPressureLimiter}. + */ + private final AtomicInteger permitsLimit; + + /** + * The duration to sleep when the queue processing is in standby. + */ + private final Duration standbyLimitPollingInterval; + private final Duration acquireTimeout; private final BackPressureMode backPressureConfiguration; private volatile CurrentThroughputMode currentThroughputMode; + /** + * The number of permits acquired in low throughput mode. This value is minimum value between {@link #permitsLimit} + * at the time of the acquire and {@link #totalPermits}. + */ + private final AtomicInteger lowThroughputAcquiredPermits = new AtomicInteger(0); + private final AtomicBoolean hasAcquiredFullPermits = new AtomicBoolean(false); private String id; + private final AtomicBoolean isDraining = new AtomicBoolean(false); + private SemaphoreBackPressureHandler(Builder builder) { this.batchSize = builder.batchSize; this.totalPermits = builder.totalPermits; + this.standbyLimitPollingInterval = builder.standbyLimitPollingInterval; this.acquireTimeout = builder.acquireTimeout; this.backPressureConfiguration = builder.backPressureMode; - this.semaphore = new Semaphore(totalPermits); + this.semaphore = new ReducibleSemaphore(totalPermits); this.currentThroughputMode = BackPressureMode.FIXED_HIGH_THROUGHPUT.equals(backPressureConfiguration) ? CurrentThroughputMode.HIGH : CurrentThroughputMode.LOW; logger.debug("SemaphoreBackPressureHandler created with configuration {} and {} total permits", backPressureConfiguration, totalPermits); + this.permitsLimit = new AtomicInteger(totalPermits); + this.backPressureLimiter = Objects.requireNonNullElse(builder.backPressureLimiter, () -> totalPermits); } public static Builder builder() { @@ -80,15 +112,17 @@ public String getId() { @Override public int request(int amount) throws InterruptedException { + updateAvailablePermitsBasedOnDownstreamBackpressure(); return tryAcquire(amount, this.currentThroughputMode) ? amount : 0; } // @formatter:off @Override public int requestBatch() throws InterruptedException { - return CurrentThroughputMode.LOW.equals(this.currentThroughputMode) - ? requestInLowThroughputMode() - : requestInHighThroughputMode(); + updateAvailablePermitsBasedOnDownstreamBackpressure(); + boolean useLowThroughput = CurrentThroughputMode.LOW.equals(this.currentThroughputMode) + || this.permitsLimit.get() < this.totalPermits; + return useLowThroughput ? requestInLowThroughputMode() : requestInHighThroughputMode(); } private int requestInHighThroughputMode() throws InterruptedException { @@ -103,10 +137,10 @@ private int tryAcquirePartial() throws InterruptedException { if (availablePermits == 0 || BackPressureMode.ALWAYS_POLL_MAX_MESSAGES.equals(this.backPressureConfiguration)) { return 0; } - int permitsToRequest = Math.min(availablePermits, this.batchSize); + int permitsToRequest = min(availablePermits, this.batchSize); CurrentThroughputMode currentThroughputModeNow = this.currentThroughputMode; - logger.trace("Trying to acquire partial batch of {} permits from {} available for {} in TM {}", - permitsToRequest, availablePermits, this.id, currentThroughputModeNow); + logger.trace("Trying to acquire partial batch of {} permits from {} (limit {}) available for {} in TM {}", + permitsToRequest, availablePermits, this.permitsLimit.get(), this.id, currentThroughputModeNow); boolean hasAcquiredPartial = tryAcquire(permitsToRequest, currentThroughputModeNow); return hasAcquiredPartial ? permitsToRequest : 0; } @@ -114,17 +148,35 @@ private int tryAcquirePartial() throws InterruptedException { private int requestInLowThroughputMode() throws InterruptedException { // Although LTM can be set / unset by many processes, only the MessageSource thread gets here, // so no actual concurrency - logger.debug("Trying to acquire full permits for {}. Permits left: {}", this.id, - this.semaphore.availablePermits()); - boolean hasAcquired = tryAcquire(this.totalPermits, CurrentThroughputMode.LOW); + logger.debug("Trying to acquire full permits for {}. Permits left: {}, Permits limit: {}", this.id, + this.semaphore.availablePermits(), this.permitsLimit.get()); + int permitsToRequest = min(this.permitsLimit.get(), this.totalPermits); + if (permitsToRequest == 0) { + logger.info("No permits usable for {} (limit = 0), sleeping for {}", this.id, + this.standbyLimitPollingInterval); + Thread.sleep(standbyLimitPollingInterval.toMillis()); + return 0; + } + boolean hasAcquired = tryAcquire(permitsToRequest, CurrentThroughputMode.LOW); if (hasAcquired) { - logger.debug("Acquired full permits for {}. Permits left: {}", this.id, this.semaphore.availablePermits()); + if (permitsToRequest >= this.totalPermits) { + logger.debug("Acquired full permits for {}. Permits left: {}, Permits limit: {}", this.id, + this.semaphore.availablePermits(), this.permitsLimit.get()); + } + else { + logger.debug("Acquired limited permits ({}) for {} . Permits left: {}, Permits limit: {}", + permitsToRequest, this.id, this.semaphore.availablePermits(), this.permitsLimit.get()); + } + int tokens = min(this.batchSize, permitsToRequest); // We've acquired all permits - there's no other process currently processing messages if (!this.hasAcquiredFullPermits.compareAndSet(false, true)) { - logger.warn("hasAcquiredFullPermits was already true. Permits left: {}", - this.semaphore.availablePermits()); + logger.warn("hasAcquiredFullPermits was already true. Permits left: {}, Permits limit: {}", + this.semaphore.availablePermits(), this.permitsLimit.get()); } - return this.batchSize; + else { + lowThroughputAcquiredPermits.set(permitsToRequest); + } + return tokens; } else { return 0; @@ -132,16 +184,20 @@ private int requestInLowThroughputMode() throws InterruptedException { } private boolean tryAcquire(int amount, CurrentThroughputMode currentThroughputModeNow) throws InterruptedException { + if (isDraining.get()) { + return false; + } logger.trace("Acquiring {} permits for {} in TM {}", amount, this.id, this.currentThroughputMode); boolean hasAcquired = this.semaphore.tryAcquire(amount, this.acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); if (hasAcquired) { - logger.trace("{} permits acquired for {} in TM {}. Permits left: {}", amount, this.id, - currentThroughputModeNow, this.semaphore.availablePermits()); + logger.trace("{} permits acquired for {} in TM {}. Permits left: {}, Permits limit: {}", amount, this.id, + currentThroughputModeNow, this.semaphore.availablePermits(), this.permitsLimit.get()); } else { - logger.trace("Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}", amount, - this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow, - this.semaphore.availablePermits()); + logger.trace( + "Not able to acquire {} permits in {} milliseconds for {} in TM {}. Permits left: {}, Permits limit: {}", + amount, this.acquireTimeout.toMillis(), this.id, currentThroughputModeNow, + this.semaphore.availablePermits(), this.permitsLimit.get()); } return hasAcquired; } @@ -181,11 +237,13 @@ public void release(int amount) { } private int getPermitsToRelease(int amount) { - return this.hasAcquiredFullPermits.compareAndSet(true, false) - // The first process that gets here should release all permits except for inflight messages - // We can have only one batch of messages at this point since we have all permits - ? this.totalPermits - (this.batchSize - amount) - : amount; + if (this.hasAcquiredFullPermits.compareAndSet(true, false)) { + int allAcquiredPermits = this.lowThroughputAcquiredPermits.getAndSet(0); + // The first process that gets here should release all permits except for inflight messages + // We can have only one batch of messages at this point since we have all permits + return (allAcquiredPermits - (min(this.batchSize, allAcquiredPermits) - amount)); + } + return amount; } private void maybeSwitchToHighThroughputMode(int amount) { @@ -200,6 +258,8 @@ private void maybeSwitchToHighThroughputMode(int amount) { public boolean drain(Duration timeout) { logger.debug("Waiting for up to {} seconds for approx. {} permits to be released for {}", timeout.getSeconds(), this.totalPermits - this.semaphore.availablePermits(), this.id); + isDraining.set(true); + updateMaxPermitsLimit(this.totalPermits); try { return this.semaphore.tryAcquire(this.totalPermits, (int) timeout.getSeconds(), TimeUnit.SECONDS); } @@ -209,6 +269,44 @@ public boolean drain(Duration timeout) { } } + private int min(int a, int p) { + return Math.max(0, Math.min(a, p)); + } + + private void updateAvailablePermitsBasedOnDownstreamBackpressure() { + if (!isDraining.get()) { + int limit = backPressureLimiter.limit(); + int newCurrentMaxPermits = min(limit, totalPermits); + updateMaxPermitsLimit(newCurrentMaxPermits); + if (isDraining.get()) { + updateMaxPermitsLimit(totalPermits); + } + } + } + + private void updateMaxPermitsLimit(int newCurrentMaxPermits) { + int oldValue = permitsLimit.getAndUpdate(i -> min(newCurrentMaxPermits, totalPermits)); + if (newCurrentMaxPermits < oldValue) { + int blockedPermits = oldValue - newCurrentMaxPermits; + semaphore.reducePermits(blockedPermits); + } + else if (newCurrentMaxPermits > oldValue) { + int releasedPermits = newCurrentMaxPermits - oldValue; + semaphore.release(releasedPermits); + } + } + + private static class ReducibleSemaphore extends Semaphore { + ReducibleSemaphore(int permits) { + super(permits); + } + + @Override + public void reducePermits(int reduction) { + super.reducePermits(reduction); + } + } + private enum CurrentThroughputMode { HIGH, @@ -223,10 +321,14 @@ public static class Builder { private int totalPermits; + private Duration standbyLimitPollingInterval; + private Duration acquireTimeout; private BackPressureMode backPressureMode; + private BackPressureLimiter backPressureLimiter; + public Builder batchSize(int batchSize) { this.batchSize = batchSize; return this; @@ -237,6 +339,11 @@ public Builder totalPermits(int totalPermits) { return this; } + public Builder standbyLimitPollingInterval(Duration standbyLimitPollingInterval) { + this.standbyLimitPollingInterval = standbyLimitPollingInterval; + return this; + } + public Builder acquireTimeout(Duration acquireTimeout) { this.acquireTimeout = acquireTimeout; return this; @@ -247,10 +354,14 @@ public Builder throughputConfiguration(BackPressureMode backPressureConfiguratio return this; } + public Builder backPressureLimiter(BackPressureLimiter backPressureLimiter) { + this.backPressureLimiter = backPressureLimiter; + return this; + } + public SemaphoreBackPressureHandler build() { - Assert.noNullElements( - Arrays.asList(this.batchSize, this.totalPermits, this.acquireTimeout, this.backPressureMode), - "Missing configuration"); + Assert.noNullElements(Arrays.asList(this.batchSize, this.totalPermits, this.standbyLimitPollingInterval, + this.acquireTimeout, this.backPressureMode), "Missing configuration"); return new SemaphoreBackPressureHandler(this); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java index 50bded839..b9834b338 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java @@ -27,6 +27,7 @@ import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration; import io.awspring.cloud.sqs.config.SqsListenerConfigurer; import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory; +import io.awspring.cloud.sqs.listener.BackPressureLimiter; import io.awspring.cloud.sqs.listener.BatchVisibility; import io.awspring.cloud.sqs.listener.ContainerComponentFactory; import io.awspring.cloud.sqs.listener.MessageListenerContainer; @@ -55,17 +56,24 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntUnaryOperator; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -269,6 +277,7 @@ void manuallyCreatesInactiveContainer() throws Exception { logger.debug("Sent message to queue {} with messageBody {}", MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME, messageBody); assertThat(latchContainer.manuallyInactiveCreatedContainerLatch.await(10, TimeUnit.SECONDS)).isTrue(); + inactiveMessageListenerContainer.stop(); } // @formatter:off @@ -349,6 +358,298 @@ void maxConcurrentMessages() { assertDoesNotThrow(() -> latchContainer.maxConcurrentMessagesBarrier.await(10, TimeUnit.SECONDS)); } + static final class Limiter implements BackPressureLimiter { + private final AtomicInteger limit; + + Limiter(int max) { + limit = new AtomicInteger(max); + } + + public void setLimit(int value) { + logger.info("adjusting limit from {} to {}", limit.get(), value); + limit.set(value); + } + + @Override + public int limit() { + return Math.max(0, limit.get()); + } + } + + @ParameterizedTest + @CsvSource({ "2,2", "4,4", "5,5", "20,5" }) + void staticBackPressureLimitShouldCapQueueProcessingCapacity(int staticLimit, int expectedMaxConcurrentRequests) + throws Exception { + AtomicInteger concurrentRequest = new AtomicInteger(); + AtomicInteger maxConcurrentRequest = new AtomicInteger(); + Limiter limiter = new Limiter(staticLimit); + String queueName = "BACK_PRESSURE_LIMITER_QUEUE_NAME_STATIC_LIMIT_" + staticLimit; + IntStream.range(0, 10).forEach(index -> { + List> messages = create10Messages("staticBackPressureLimit" + staticLimit); + sqsTemplate.sendMany(queueName, messages); + }); + logger.debug("Sent 100 messages to queue {}", queueName); + var latch = new CountDownLatch(100); + var container = SqsMessageListenerContainer.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()) + .queueNames(queueName).configure(options -> options.pollTimeout(Duration.ofSeconds(1)) + .maxConcurrentMessages(5).maxMessagesPerPoll(5).backPressureLimiter(limiter)) + .messageListener(msg -> { + int concurrentRqs = concurrentRequest.incrementAndGet(); + maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs)); + sleep(50L); + logger.debug("concurrent rq {}, max concurrent rq {}, latch count {}", concurrentRequest.get(), + maxConcurrentRequest.get(), latch.getCount()); + latch.countDown(); + concurrentRequest.decrementAndGet(); + }).build(); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(maxConcurrentRequest.get()).isEqualTo(expectedMaxConcurrentRequests); + container.stop(); + } + + @Test + void zeroBackPressureLimitShouldStopQueueProcessing() throws Exception { + AtomicInteger concurrentRequest = new AtomicInteger(); + AtomicInteger maxConcurrentRequest = new AtomicInteger(); + Limiter limiter = new Limiter(0); + String queueName = "BACK_PRESSURE_LIMITER_QUEUE_NAME_STATIC_LIMIT_0"; + IntStream.range(0, 10).forEach(index -> { + List> messages = create10Messages("staticBackPressureLimit0"); + sqsTemplate.sendMany(queueName, messages); + }); + logger.debug("Sent 100 messages to queue {}", queueName); + var latch = new CountDownLatch(100); + var container = SqsMessageListenerContainer.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()) + .queueNames(queueName).configure(options -> options.pollTimeout(Duration.ofSeconds(1)) + .maxConcurrentMessages(5).maxMessagesPerPoll(5).backPressureLimiter(limiter)) + .messageListener(msg -> { + int concurrentRqs = concurrentRequest.incrementAndGet(); + maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs)); + sleep(50L); + logger.debug("concurrent rq {}, max concurrent rq {}, latch count {}", concurrentRequest.get(), + maxConcurrentRequest.get(), latch.getCount()); + latch.countDown(); + concurrentRequest.decrementAndGet(); + }).build(); + container.start(); + assertThat(latch.await(2, TimeUnit.SECONDS)).isFalse(); + assertThat(maxConcurrentRequest.get()).isZero(); + assertThat(latch.getCount()).isEqualTo(100L); + container.stop(); + } + + @Test + void changeInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Exception { + AtomicInteger concurrentRequest = new AtomicInteger(); + AtomicInteger maxConcurrentRequest = new AtomicInteger(); + Limiter limiter = new Limiter(5); + String queueName = "BACK_PRESSURE_LIMITER_QUEUE_NAME_SYNC_ADAPTIVE_LIMIT"; + int nbMessages = 280; + IntStream.range(0, nbMessages / 10).forEach(index -> { + List> messages = create10Messages("syncAdaptiveBackPressureLimit"); + sqsTemplate.sendMany(queueName, messages); + }); + logger.debug("Sent {} messages to queue {}", nbMessages, queueName); + var latch = new CountDownLatch(nbMessages); + var controlSemaphore = new Semaphore(0); + var advanceSemaphore = new Semaphore(0); + var container = SqsMessageListenerContainer.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()) + .queueNames(queueName).configure(options -> options.pollTimeout(Duration.ofSeconds(1)) + .maxConcurrentMessages(5).maxMessagesPerPoll(5).backPressureLimiter(limiter)) + .messageListener(msg -> { + try { + controlSemaphore.acquire(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + int concurrentRqs = concurrentRequest.incrementAndGet(); + maxConcurrentRequest.updateAndGet(max -> Math.max(max, concurrentRqs)); + latch.countDown(); + logger.debug("concurrent rq {}, max concurrent rq {}, latch count {}", concurrentRequest.get(), + maxConcurrentRequest.get(), latch.getCount()); + sleep(10L); + concurrentRequest.decrementAndGet(); + advanceSemaphore.release(); + }).build(); + class Controller { + private final Semaphore advanceSemaphore; + private final Semaphore controlSemaphore; + private final Limiter limiter; + private final AtomicInteger maxConcurrentRequest; + + Controller(Semaphore advanceSemaphore, Semaphore controlSemaphore, Limiter limiter, + AtomicInteger maxConcurrentRequest) { + this.advanceSemaphore = advanceSemaphore; + this.controlSemaphore = controlSemaphore; + this.limiter = limiter; + this.maxConcurrentRequest = maxConcurrentRequest; + } + + public void updateLimit(int newLimit) { + limiter.setLimit(newLimit); + } + + void updateLimitAndWaitForReset(int newLimit) throws InterruptedException { + updateLimit(newLimit); + int atLeastTwoPollingCycles = 2 * 5; + controlSemaphore.release(atLeastTwoPollingCycles); + waitForAdvance(atLeastTwoPollingCycles); + maxConcurrentRequest.set(0); + } + + void advance(int permits) { + controlSemaphore.release(permits); + } + + void waitForAdvance(int permits) throws InterruptedException { + assertThat(advanceSemaphore.tryAcquire(permits, 5, TimeUnit.SECONDS)) + .withFailMessage(() -> "Waiting for %d permits timed out. Only %d permits available" + .formatted(permits, advanceSemaphore.availablePermits())) + .isTrue(); + } + } + var controller = new Controller(advanceSemaphore, controlSemaphore, limiter, maxConcurrentRequest); + try { + container.start(); + + controller.advance(50); + controller.waitForAdvance(50); + // not limiting queue processing capacity + assertThat(controller.maxConcurrentRequest.get()).isEqualTo(5); + controller.updateLimitAndWaitForReset(2); + controller.advance(50); + + controller.waitForAdvance(50); + // limiting queue processing capacity + assertThat(controller.maxConcurrentRequest.get()).isEqualTo(2); + controller.updateLimitAndWaitForReset(7); + controller.advance(50); + + controller.waitForAdvance(50); + // not limiting queue processing capacity + assertThat(controller.maxConcurrentRequest.get()).isEqualTo(5); + controller.updateLimitAndWaitForReset(3); + controller.advance(50); + sleep(10L); + limiter.setLimit(1); + sleep(10L); + limiter.setLimit(2); + sleep(10L); + limiter.setLimit(3); + + controller.waitForAdvance(50); + assertThat(controller.maxConcurrentRequest.get()).isEqualTo(3); + // stopping processing of the queue + controller.updateLimit(0); + controller.advance(50); + assertThat(advanceSemaphore.tryAcquire(10, 5, TimeUnit.SECONDS)) + .withFailMessage("Acquiring semaphore should have timed out as limit was set to 0").isFalse(); + + // resume queue processing + controller.updateLimit(6); + + controller.waitForAdvance(50); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(controller.maxConcurrentRequest.get()).isEqualTo(5); + } + finally { + container.stop(); + } + } + + /** + * This test simulates a progressive change in the back pressure limit. Unlike + * {@link #changeInBackPressureLimitShouldAdaptQueueProcessingCapacity()}, this test does not block message + * consumption while updating the limit. + *

+ * The limit is updated in a loop until all messages are consumed. The update follows a triangle wave pattern with a + * minimum of 0, a maximum of 15, and a period of 30 iterations. After each update of the limit, the test waits up + * to 10ms and samples the maximum number of concurrent messages that were processed since the update. This number + * can be higher than the defined limit during the adaptation period of the decreasing limit wave. For the + * increasing limit wave, it is usually lower due to the adaptation delay. In both cases, the maximum number of + * concurrent messages being processed rapidly converges toward the defined limit. + *

+ * The test passes if the sum of the sampled maximum number of concurrently processed messages is lower than the sum + * of the limits at those points in time. + */ + @Test + void unsynchronizedChangesInBackPressureLimitShouldAdaptQueueProcessingCapacity() throws Exception { + AtomicInteger concurrentRequest = new AtomicInteger(); + AtomicInteger maxConcurrentRequest = new AtomicInteger(); + Limiter limiter = new Limiter(0); + String queueName = "REACTIVE_BACK_PRESSURE_LIMITER_QUEUE_NAME_ADAPTIVE_LIMIT"; + int nbMessages = 1000; + Semaphore advanceSemaphore = new Semaphore(0); + IntStream.range(0, nbMessages / 10).forEach(index -> { + List> messages = create10Messages("reactAdaptiveBackPressureLimit"); + sqsTemplate.sendMany(queueName, messages); + }); + logger.debug("Sent {} messages to queue {}", nbMessages, queueName); + var latch = new CountDownLatch(nbMessages); + var container = SqsMessageListenerContainer.builder().sqsAsyncClient(BaseSqsIntegrationTest.createAsyncClient()) + .queueNames(queueName) + .configure(options -> options.pollTimeout(Duration.ofSeconds(1)) + .standbyLimitPollingInterval(Duration.ofMillis(1)).maxConcurrentMessages(10) + .maxMessagesPerPoll(10).backPressureLimiter(limiter)) + .messageListener(msg -> { + int currentConcurrentRq = concurrentRequest.incrementAndGet(); + maxConcurrentRequest.updateAndGet(max -> Math.max(max, currentConcurrentRq)); + sleep(ThreadLocalRandom.current().nextInt(10)); + latch.countDown(); + logger.debug("concurrent rq {}, max concurrent rq {}, latch count {}", concurrentRequest.get(), + maxConcurrentRequest.get(), latch.getCount()); + concurrentRequest.decrementAndGet(); + advanceSemaphore.release(); + }).build(); + IntUnaryOperator progressiveLimitChange = (int x) -> { + int period = 30; + int halfPeriod = period / 2; + if (x % period < halfPeriod) { + return (x % halfPeriod); + } + else { + return (halfPeriod - (x % halfPeriod)); + } + }; + try { + container.start(); + Random random = new Random(); + int limitsSum = 0; + int maxConcurrentRqSum = 0; + int changeLimitCount = 0; + while (latch.getCount() > 0 && changeLimitCount < nbMessages) { + changeLimitCount++; + int limit = progressiveLimitChange.applyAsInt(changeLimitCount); + limiter.setLimit(limit); + maxConcurrentRequest.set(0); + sleep(random.nextInt(10)); + int actualLimit = Math.min(10, limit); + int max = maxConcurrentRequest.getAndSet(0); + if (max > 0) { + // Ignore iterations where nothing was polled (messages consumption slower than iteration) + limitsSum += actualLimit; + maxConcurrentRqSum += max; + } + } + assertThat(maxConcurrentRqSum).isLessThanOrEqualTo(limitsSum); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + finally { + container.stop(); + } + } + + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + static class ReceivesMessageListener { @Autowired