Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why wont the consumer does not consume multiple messages when batchSize is defined. #266

Closed
michaelparkadze opened this issue Jun 22, 2021 · 3 comments
Labels

Comments

@michaelparkadze
Copy link

I am trying to use sqs-consumer to process multiple messages in parallel and for some reason I can not get it to work, so I hope someone could show me what I am doing wrong here.

Here is the code example:

SqsConsumer class

 export default class SqsConsumer {
  public sqsConsumer: Consumer;

  constructor(params: CreateSqsSubscriberParams) {
    const { queueUrl, handleMessage, sqsOptions, visibilityTimeout, waitTimeSeconds, batchSize } = params;

    this.sqsConsumer = Consumer.create({
      queueUrl,
      handleMessage,
      visibilityTimeout,
      waitTimeSeconds,
      batchSize,
      sqs: new SQS(sqsOptions),
    });
  }

  startReadingMessages(): void {
    console.log('reading messages');
    this.sqsConsumer.start();

    this.sqsConsumer.on('error', (err) => {
      logger.error(err.message);
    });

    this.sqsConsumer.on('processing_error', (err) => {
      logger.error(err.message);
    });

    this.sqsConsumer.on('timeout_error', (err) => {
      logger.error(err.message);
    });

    this.sqsConsumer.on('response_processed', () => {
      logger.info('First Batch has successfully processed');
    });
  }
} 

Example subscriber:

export const assetNotificationSubscribe = () => {
  const sqsConsumer = new SqsConsumer({
    queueUrl: config.awsSqs.assetsAssetNotificationQueueUrl,
    visibilityTimeout: 30,
    waitTimeSeconds: 20,
    batchSize: 5,
    sqsOptions: {
      correctClockSkew: true,
    },
    handleMessage,
  });

  sqsConsumer.startReadingMessages();
};

Even though the batchSize is 5 the 'response_processed' log gets logged each time a single process is done. Thanks in advance.

@michaelparkadze
Copy link
Author

michaelparkadze commented Jun 22, 2021

Update:
I read about handleMessageBatch and passed it in the new SqsConsumer and added it in the constructor of the class and I am still having the processes handled one at a time and not parallel.

@michaelparkadze
Copy link
Author

I ran some more test and I found one that even though my batchSize was for example 5, I only got 2 messages in the batch even though there were more in the queue itself. I found that this might happen in the amazon receiveMessage page:

MaxNumberOfMessages:
"Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1. "

So I assume the same might happen in this situation.

@nguyentoanit
Copy link

I got same issue here. Anyone has any idea to resolve it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants