Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@SQSListener method: @SnsNotificationMessage parameter doesn't work in batching method signatures #1129

Closed
EstelaGarcia opened this issue Apr 3, 2024 · 8 comments
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer

Comments

@EstelaGarcia
Copy link

EstelaGarcia commented Apr 3, 2024

Type: Bug

Component:
SQS

Describe the bug
Using Spring Cloud 3.1.1 (have also tried 3.2.0-M1)

The following method signatures result in an SQS Listener that works and converts to the appropriate object:
SQS message, single object

    @SqsListener(value = "queuename")
    public void onMessage(CustomObjectType notification) throws Exception {...}

SQS message, List of objects

    @SqsListener(value = "queuename")
    public void onMessage(List<CustomObjectType> notifications) throws Exception {...}

SNS to SQS message, single object

    @SqsListener(value = "queuename")
    public void onMessage(@SnsNotificationMessage CustomObjectType notification) throws Exception {...}

However the following does not work as no converter can be found:

    @SqsListener(value = "queuename")
    public void onMessage(@SnsNotificationMessage List<CustomObjectType> notification) throws Exception {...}

The error occurs in the following method in io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter

    protected final Object invokeHandler(Collection<Message<T>> messages) {
        try {
            return this.handlerMethod.invoke(MessageBuilder.withPayload(messages).build(), new Object[0]);
        } catch (Exception var3) {
            throw this.createListenerException((Collection)messages, var3);
        }
    }

CustomObjectType is a simple JSON object, and the conversion occurs correctly when handling a single message at a time.

From the comments under the PR that introduced the @SnsNotificationMessage annotation, we are not the only ones experiencing this problem.

@tomazfernandes
Copy link
Contributor

Hi @EstelaGarcia, thanks for opening the issue.

You're right in that batch Sns Messages are not currently supported.

Would you like to contribute a PR with that functionality?

Thanks

@tomazfernandes
Copy link
Contributor

Oh, my mistake, we have a PR already. Please ignore the last message.

Thanks.

@tomazfernandes tomazfernandes added component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer labels Apr 15, 2024
@nandeeshsu
Copy link

Hi,
I'm looking for this feature. Any idea when this will be available?

In the meantime, I copied the NotificationMessageArgumentResolver and the SnsMessageConverter to my code base.
and did the below configuration to use the copied resolve and converter.

`@Configuration
public class AWSSQSConfiguration {

@Bean
SqsListenerConfigurer configurer() {
    return registrar -> registrar.manageMethodArgumentResolvers(
            handlerMethodArgumentResolvers -> handlerMethodArgumentResolvers.add(2,
                    new NotificationMessageArgumentResolver(messageConverter(), new ObjectMapper())));
}

@Bean
public MessageConverter messageConverter() {
    return new MappingJackson2MessageConverter();
}

`

@SqsListener(value = "${com.example.fto.aws.sqs.queue-url}",
            maxMessagesPerPoll="15",
            pollTimeoutSeconds="20",
            messageVisibilitySeconds="5",
            maxConcurrentMessages="15")
public void listenForMultipleMessages(@SnsNotificationMessage List<FTOOrderEventDto> ftoOrderEventDtoList) {

however the conversion still doesn't convert to appropriate object. I'm able to see multiple message read from the SQS and tries to convert into FTOOrderEventDto, however all the properties/attributes are null

Not sure what wrong I'm doing here. any suggestions?

@nilesh-chordia
Copy link

nilesh-chordia commented Aug 8, 2024

I am facing below issue after adding above conf wrt to SQS

@SqsListener(value = "${LISTENER_SQS:test-sqs}",
    factory = "factory", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS)
public void processRequest(@SnsNotificationMessage final Class obj)
{
}

org.springframework.context.ApplicationContextException: Failed to start bean 'io.awspring.cloud.messaging.internalEndpointRegistryBeanName'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:186)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:363)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:160)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:128)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:968)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:618)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:738)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:440)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:324)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1317)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306)
at com.nilesh.central.Main.main(Main.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:95)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65)
Caused by: java.util.concurrent.CompletionException: io.awspring.cloud.sqs.QueueAttributesResolvingException: Error resolving attributes for queue QUEUE_NAME with strategy CREATE and queueAttributesNames []

@nilesh-chordia
Copy link

@EstelaGarcia and @nandeeshsu , Have you faced above issue,

@EstelaGarcia
Copy link
Author

@nilesh-chordia , I ended up doing my own parsing of the incoming messages (having the argument in the method be a list of strings) as I didn't have time to figure out a cleaner way to do it (we had a deadline).
However, your issue seems to be related to the queue name not being set correctly, although I am unsure as to why (you have a default value in the value attribute of the SqsListener annotation so it should be at least picking that up):

Error resolving attributes for queue QUEUE_NAME with strategy CREATE and queueAttributesNames []

@nilesh-chordia
Copy link

@EstelaGarcia , Thanks for your response, Issue is resolved.
Problem is: I am using older version of localstack, after using localstack of version 3.x. Issue resolved.

@tomazfernandes
Copy link
Contributor

PR with the enhancement was merged: #1191

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer
Projects
None yet
Development

No branches or pull requests

4 participants