Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acknowledgement mode on @SqsListener annotation #870

Merged
merged 11 commits into from
Nov 4, 2023
6 changes: 5 additions & 1 deletion docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ Note that if there are messages available the call may return earlier than this
- `messageVisibilitySeconds` - Set the minimum visibility for the messages retrieved in a poll.
Note that for `FIFO` single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener.
See <<FIFO Support>> for more information.

- `acknowledgementMode` - Set the acknowledgement mode for the container.
If any value is set, it will take precedence over the acknowledgement mode defined for the container factory options.
See <<Acknowledgement Mode>> for more information.

===== Listener Method Arguments

Expand Down Expand Up @@ -1326,6 +1328,8 @@ NOTE: All options are available for both `single message` and `batch` message li
- `ALWAYS` - Acknowledges a message or batch of messages after processing returns success or error.
- `MANUAL` - The framework won't acknowledge messages automatically and `Acknowledgement` objects can be received in the listener method.

The `Acknowledgement` strategy can be configured in the `SqsContainerOptions` or in the `@SqsListener` annotation.

==== Acknowledgement Batching

The `acknowledgementInterval` and `acknowledgementThreshold` options enable acknowledgement batching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,10 @@
import io.awspring.cloud.sqs.config.HandlerMethodEndpoint;
import io.awspring.cloud.sqs.config.SqsEndpoint;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import io.awspring.cloud.sqs.support.resolver.AcknowledgmentHandlerMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.BatchAcknowledgmentArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.BatchPayloadMethodArgumentResolver;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -69,11 +57,26 @@
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* {@link BeanPostProcessor} implementation that scans beans for a {@link SqsListener @SqsListener} annotation, extracts
* information to a {@link SqsEndpoint}, and registers it in the {@link EndpointRegistrar}.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
*/
public abstract class AbstractListenerAnnotationBeanPostProcessor<A extends Annotation>
Expand Down Expand Up @@ -219,6 +222,17 @@ protected Integer resolveAsInteger(String value, String propertyName) {
}
}

@Nullable
protected AcknowledgementMode resolveAcknowledgement(String value) {
try {
final String resolvedValue = resolveAsString(value, "acknowledgementMode");
return StringUtils.hasText(resolvedValue) ? AcknowledgementMode.valueOf(resolvedValue) : null;
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Cannot resolve " + value + " as AcknowledgementMode", e);
}
}

protected String getEndpointId(String id) {
if (StringUtils.hasText(id)) {
return resolveAsString(id, "id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
* @author Alain Sahli
* @author Matej Nedic
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 1.1
*/
@Target(ElementType.METHOD)
Expand Down Expand Up @@ -137,4 +138,10 @@
*/
String messageVisibilitySeconds() default "";

/**
* The acknowledgement mode to be used for the provided queues. If not specified, the acknowledgement mode defined
* for the container factory will be used.
*/
String acknowledgementMode() default "";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AlwaysAcknowledgementHandler;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.NeverAcknowledgementHandler;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.OnSuccessAcknowledgementHandler;

/**
* Acknowledgement strategies supported by the {@link SqsListener} annotation.
*
* @author Joao Calassio
* @since 3.1
* @see OnSuccessAcknowledgementHandler
* @see AlwaysAcknowledgementHandler
* @see NeverAcknowledgementHandler
* @see io.awspring.cloud.sqs.listener.ContainerOptions
* @see SqsListener
*/
public class SqsListenerAcknowledgementMode {

/**
* Messages will be acknowledged when message processing is successful.
*/
public static final String ON_SUCCESS = "ON_SUCCESS";

/**
* Messages will be acknowledged whether processing was completed successfully or with an error.
*/
public static final String ALWAYS = "ALWAYS";

/**
* Messages will not be acknowledged automatically by the container.
* @see io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement
*/
public static final String MANUAL = "MANUAL";

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* {@link AbstractListenerAnnotationBeanPostProcessor} implementation for {@link SqsListener @SqsListener}.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
*/
public class SqsListenerAnnotationBeanPostProcessor extends AbstractListenerAnnotationBeanPostProcessor<SqsListener> {
Expand All @@ -51,7 +52,7 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages"))
.messageVisibility(
resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
.build();
.acknowledgementMode(resolveAcknowledgement(sqsListenerAnnotation.acknowledgementMode())).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.awspring.cloud.sqs.config;

import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import java.time.Duration;
import java.util.Collection;
import org.springframework.lang.Nullable;
Expand All @@ -26,6 +27,7 @@
* Contains properties that should be mapped from {@link SqsListener @SqsListener} annotations.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
*/
public class SqsEndpoint extends AbstractEndpoint {
Expand All @@ -38,12 +40,16 @@ public class SqsEndpoint extends AbstractEndpoint {

private final Integer maxMessagesPerPoll;

@Nullable
private final AcknowledgementMode acknowledgementMode;

protected SqsEndpoint(SqsEndpointBuilder builder) {
super(builder.queueNames, builder.factoryName, builder.id);
this.maxConcurrentMessages = builder.maxConcurrentMessages;
this.pollTimeoutSeconds = builder.pollTimeoutSeconds;
this.messageVisibility = builder.messageVisibility;
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.acknowledgementMode = builder.acknowledgementMode;
}

/**
Expand Down Expand Up @@ -91,6 +97,15 @@ public Duration getMessageVisibility() {
return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null;
}

/**
* Returns the acknowledgement mode configured for this endpoint.
* @return the acknowledgement mode.
*/
@Nullable
public AcknowledgementMode getAcknowledgementMode() {
return this.acknowledgementMode;
}

public static class SqsEndpointBuilder {

private Collection<String> queueNames;
Expand All @@ -107,6 +122,9 @@ public static class SqsEndpointBuilder {

private Integer maxMessagesPerPoll;

@Nullable
private AcknowledgementMode acknowledgementMode;

public SqsEndpointBuilder queueNames(Collection<String> queueNames) {
this.queueNames = queueNames;
return this;
Expand Down Expand Up @@ -142,6 +160,11 @@ public SqsEndpointBuilder id(String id) {
return this;
}

public SqsEndpointBuilder acknowledgementMode(@Nullable AcknowledgementMode acknowledgementMode) {
this.acknowledgementMode = acknowledgementMode;
return this;
}

public SqsEndpoint build() {
return new SqsEndpoint(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
* used.
*
* @author Tomaz Fernandes
* @author Joao Calassio
* @since 3.0
* @see SqsMessageListenerContainer
* @see ContainerOptions
Expand Down Expand Up @@ -161,7 +162,8 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio
ConfigUtils.INSTANCE.acceptIfNotNull(sqsEndpoint.getMaxConcurrentMessages(), options::maxConcurrentMessages)
.acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll)
.acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout)
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility);
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility)
.acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import java.lang.reflect.Field;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/**
* Tests for {@link SqsListenerAcknowledgementMode} enum values
*
* @author Joao Calassio
*/
class SqsListenerAcknowledgementModeTests {

@ParameterizedTest
@EnumSource(AcknowledgementMode.class)
void shouldHaveAllValuesOfAcknowledgementModeEnum(final AcknowledgementMode acknowledgementMode)
throws NoSuchFieldException, IllegalAccessException {
Class<SqsListenerAcknowledgementMode> clz = SqsListenerAcknowledgementMode.class;
Field correspondingValue = clz.getDeclaredField(acknowledgementMode.name());
assertEquals(acknowledgementMode.name(), correspondingValue.get(clz));
}

}
Loading
Loading