Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acknowledgement mode on @SqsListener annotation #870

Merged
merged 11 commits into from
Nov 4, 2023
4 changes: 4 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ Note that if there are messages available the call may return earlier than this
- `messageVisibilitySeconds` - Set the minimum visibility for the messages retrieved in a poll.
Note that for `FIFO` single message listener methods, this visibility is applied to the whole batch before each message is sent to the listener.
See <<FIFO Support>> for more information.
- `acknowledgementMode` - Set the acknowledgement mode for the container. Default value is `SqsListenerAcknowledgementMode.INHERIT`,
meaning that it will inherit the acknowledgement mode of the container factory. See <<Acknowledgement Mode>> for more information.


===== Listener Method Arguments
Expand Down Expand Up @@ -1326,6 +1328,8 @@ NOTE: All options are available for both `single message` and `batch` message li
- `ALWAYS` - Acknowledges a message or batch of messages after processing returns success or error.
- `MANUAL` - The framework won't acknowledge messages automatically and `Acknowledgement` objects can be received in the listener method.

The `Acknowledgement` strategy can be configured in the `SqsContainerOptions` or in the `@SqsListener` annotation.

==== Acknowledgement Batching

The `acknowledgementInterval` and `acknowledgementThreshold` options enable acknowledgement batching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.awspring.cloud.sqs.config.HandlerMethodEndpoint;
import io.awspring.cloud.sqs.config.SqsEndpoint;
import io.awspring.cloud.sqs.config.SqsListenerConfigurer;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import io.awspring.cloud.sqs.support.resolver.AcknowledgmentHandlerMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.BatchAcknowledgmentArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.BatchPayloadMethodArgumentResolver;
Expand Down Expand Up @@ -219,6 +220,16 @@ protected Integer resolveAsInteger(String value, String propertyName) {
}
}

@Nullable
protected AcknowledgementMode resolve(SqsListenerAcknowledgementMode value) {
try {
return value == SqsListenerAcknowledgementMode.INHERIT ? null : AcknowledgementMode.valueOf(value.name());
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Cannot resolve " + value + " as AcknowledgementMode", e);
}
}

protected String getEndpointId(String id) {
if (StringUtils.hasText(id)) {
return resolveAsString(id, "id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,10 @@
*/
String messageVisibilitySeconds() default "";

/**
* The acknowledgement mode to be used for the provided queues. If not specified, the
* acknowledgement mode defined for the container factory will be used.
*/
SqsListenerAcknowledgementMode acknowledgmentMode() default SqsListenerAcknowledgementMode.INHERIT;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AlwaysAcknowledgementHandler;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.NeverAcknowledgementHandler;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.OnSuccessAcknowledgementHandler;

/**
* Configures the acknowledgement behavior for the container.
*
* @author Joao Calassio
* @since 3.1
* @see OnSuccessAcknowledgementHandler
* @see AlwaysAcknowledgementHandler
* @see NeverAcknowledgementHandler
* @see io.awspring.cloud.sqs.listener.ContainerOptions
* @see SqsListener
*/
public enum SqsListenerAcknowledgementMode {

/**
* Use acknowledge mode defined by the container.
*/
INHERIT,
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved

/**
* Messages will be acknowledged when message processing is successful.
*/
ON_SUCCESS,

/**
* Messages will be acknowledged whether processing was completed successfully or with
* an error.
*/
ALWAYS,

/**
* Messages will not be acknowledged automatically by the container.
* @see io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement
*/
MANUAL,

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected Endpoint createEndpoint(SqsListener sqsListenerAnnotation) {
resolveAsInteger(sqsListenerAnnotation.maxConcurrentMessages(), "maxConcurrentMessages"))
.messageVisibility(
resolveAsInteger(sqsListenerAnnotation.messageVisibilitySeconds(), "messageVisibility"))
.build();
.acknowledgementMode(resolve(sqsListenerAnnotation.acknowledgmentMode())).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.awspring.cloud.sqs.config;

import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import java.time.Duration;
import java.util.Collection;
import org.springframework.lang.Nullable;
Expand All @@ -38,12 +39,16 @@ public class SqsEndpoint extends AbstractEndpoint {

private final Integer maxMessagesPerPoll;

@Nullable
private final AcknowledgementMode acknowledgementMode;

protected SqsEndpoint(SqsEndpointBuilder builder) {
super(builder.queueNames, builder.factoryName, builder.id);
this.maxConcurrentMessages = builder.maxConcurrentMessages;
this.pollTimeoutSeconds = builder.pollTimeoutSeconds;
this.messageVisibility = builder.messageVisibility;
this.maxMessagesPerPoll = builder.maxMessagesPerPoll;
this.acknowledgementMode = builder.acknowledgementMode;
}

/**
Expand Down Expand Up @@ -91,6 +96,15 @@ public Duration getMessageVisibility() {
return this.messageVisibility != null ? Duration.ofSeconds(this.messageVisibility) : null;
}

/**
* Returns the acknowledgement mode configured for this endpoint.
* @return the acknowledgement mode.
*/
@Nullable
public AcknowledgementMode getAcknowledgementMode() {
return this.acknowledgementMode;
}

public static class SqsEndpointBuilder {

private Collection<String> queueNames;
Expand All @@ -107,6 +121,9 @@ public static class SqsEndpointBuilder {

private Integer maxMessagesPerPoll;

@Nullable
private AcknowledgementMode acknowledgementMode;

public SqsEndpointBuilder queueNames(Collection<String> queueNames) {
this.queueNames = queueNames;
return this;
Expand Down Expand Up @@ -142,6 +159,11 @@ public SqsEndpointBuilder id(String id) {
return this;
}

public SqsEndpointBuilder acknowledgementMode(@Nullable AcknowledgementMode acknowledgementMode) {
this.acknowledgementMode = acknowledgementMode;
return this;
}

public SqsEndpoint build() {
return new SqsEndpoint(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ private void configureFromSqsEndpoint(SqsEndpoint sqsEndpoint, SqsContainerOptio
ConfigUtils.INSTANCE.acceptIfNotNull(sqsEndpoint.getMaxConcurrentMessages(), options::maxConcurrentMessages)
.acceptIfNotNull(sqsEndpoint.getMaxMessagesPerPoll(), options::maxMessagesPerPoll)
.acceptIfNotNull(sqsEndpoint.getPollTimeout(), options::pollTimeout)
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility);
.acceptIfNotNull(sqsEndpoint.getMessageVisibility(), options::messageVisibility)
.acceptIfNotNull(sqsEndpoint.getAcknowledgementMode(), options::acknowledgementMode);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import java.time.Duration;
import java.util.Collection;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
Expand Down Expand Up @@ -65,4 +66,11 @@ SqsContainerOptionsBuilder messageSystemAttributeNames(
*/
SqsContainerOptionsBuilder queueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy);

/**
* Set the {@link AcknowledgementMode} for the container.
* @param acknowledgementMode the acknowledgement mode.
* @return this instance.
*/
SqsContainerOptionsBuilder acknowledgementMode(AcknowledgementMode acknowledgementMode);
tomazfernandes marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Tests for {@link SqsListenerAcknowledgementMode} enum values
*
* @author Joao Calassio
*/
class SqsListenerAcknowledgementModeTests {

@ParameterizedTest
@EnumSource(AcknowledgementMode.class)
void shouldHaveAllValuesOfAcknowledgementModeEnum(final AcknowledgementMode acknowledgementMode) {
final SqsListenerAcknowledgementMode correspondingValue = assertDoesNotThrow(() ->
SqsListenerAcknowledgementMode.valueOf(acknowledgementMode.name()));
assertEquals(acknowledgementMode.name(), correspondingValue.name());
}

}
Loading