From 16e1f41de7d52e121f7b5ead4a51a54304317b00 Mon Sep 17 00:00:00 2001 From: Tri Tran Date: Wed, 9 May 2018 10:30:21 +0700 Subject: [PATCH 1/5] Add a new parameter for FIFO queue, names Content-BasedDeduplication --- Command/QueueCreateCommand.php | 9 +++++++- DependencyInjection/Compiler/SQSQueuePass.php | 4 +++- DependencyInjection/Configuration.php | 2 ++ Service/QueueManager.php | 3 ++- .../Command/QueueCreateCommandTest.php | 4 +++- .../Compiler/SQSQueuePassTest.php | 21 ++++++++++++------- 6 files changed, 32 insertions(+), 11 deletions(-) diff --git a/Command/QueueCreateCommand.php b/Command/QueueCreateCommand.php index 4dd385c..70526b8 100644 --- a/Command/QueueCreateCommand.php +++ b/Command/QueueCreateCommand.php @@ -63,6 +63,12 @@ protected function configure() 'VisibilityTimeout', 30 ) + ->addOption( + 'content_based_deduplication', + null, + InputOption::VALUE_NONE, + 'ContentBasedDeduplication' + ) ->setDescription('Create a queue by name and basic attributions'); } @@ -86,7 +92,8 @@ protected function execute(InputInterface $input, OutputInterface $output) 'MaximumMessageSize' => $input->getOption('maximum_message_size'), 'MessageRetentionPeriod' => $input->getOption('message_retention_period'), 'ReceiveMessageWaitTimeSeconds' => $input->getOption('receive_message_wait_time_seconds'), - 'VisibilityTimeout' => $input->getOption('visibility_timeout') + 'VisibilityTimeout' => $input->getOption('visibility_timeout'), + 'ContentBasedDeduplication' => $input->getOption('content_based_deduplication') ]); $io->text(sprintf('Created successfully. New Queue URL: %s', $queueUrl)); diff --git a/DependencyInjection/Compiler/SQSQueuePass.php b/DependencyInjection/Compiler/SQSQueuePass.php index e16992e..199eb19 100644 --- a/DependencyInjection/Compiler/SQSQueuePass.php +++ b/DependencyInjection/Compiler/SQSQueuePass.php @@ -81,7 +81,9 @@ public function process(ContainerBuilder $container) $queueOption['attributes']['redrive_policy']['dead_letter_queue'] ?? '', 'maxReceiveCount' => $queueOption['attributes']['redrive_policy']['max_receive_count'] ?? 5, - ]) : '' + ]) : '', + 'ContentBasedDeduplication' => + $queueOption['attributes']['content_based_deduplication'] ?? false ] ]); diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 2c3450f..01be10f 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -2,6 +2,7 @@ namespace TriTran\SqsQueueBundle\DependencyInjection; +use Symfony\Component\Config\Definition\Builder\BooleanNodeDefinition; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\ConfigurationInterface; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; @@ -65,6 +66,7 @@ protected function getSQSQueueAttributesNodeDef() ->append((new IntegerNodeDefinition('message_retention_period'))->defaultValue(345600)->min(60)->max(1209600))// 4 days ->append((new IntegerNodeDefinition('receive_message_wait_time_seconds'))->defaultValue(20)->min(0)->max(20))// seconds ->append((new IntegerNodeDefinition('visibility_timeout'))->defaultValue(30)->min(0)->max(43200))// second + ->append((new BooleanNodeDefinition('content_based_deduplication'))->defaultValue(false))// second ->append($this->getSQSRedrivePolicyNode()) ->end(); } diff --git a/Service/QueueManager.php b/Service/QueueManager.php index bb6df10..2b2bd6d 100644 --- a/Service/QueueManager.php +++ b/Service/QueueManager.php @@ -25,7 +25,8 @@ class QueueManager 'MessageRetentionPeriod' => 345600, // 4 days 'ReceiveMessageWaitTimeSeconds' => 0, 'VisibilityTimeout' => 30, - 'RedrivePolicy' => '' + 'RedrivePolicy' => '', + 'ContentBasedDeduplication' => false ]; /** diff --git a/Tests/Functional/Command/QueueCreateCommandTest.php b/Tests/Functional/Command/QueueCreateCommandTest.php index 1b9e389..5608ecb 100644 --- a/Tests/Functional/Command/QueueCreateCommandTest.php +++ b/Tests/Functional/Command/QueueCreateCommandTest.php @@ -34,7 +34,8 @@ public function setUp() 'MaximumMessageSize' => 1, 'MessageRetentionPeriod' => 1, 'ReceiveMessageWaitTimeSeconds' => 1, - 'VisibilityTimeout' => 1 + 'VisibilityTimeout' => 1, + 'ContentBasedDeduplication' => true, ]) ->willReturn('new-queue-url'); @@ -55,6 +56,7 @@ public function testExecute() '--message_retention_period' => 1, '--receive_message_wait_time_seconds' => 1, '--visibility_timeout' => 1, + '--content_based_deduplication' => true, ]); $output = $commandTester->getDisplay(); diff --git a/Tests/Unit/DependencyInjection/Compiler/SQSQueuePassTest.php b/Tests/Unit/DependencyInjection/Compiler/SQSQueuePassTest.php index 427dcdc..05c616f 100644 --- a/Tests/Unit/DependencyInjection/Compiler/SQSQueuePassTest.php +++ b/Tests/Unit/DependencyInjection/Compiler/SQSQueuePassTest.php @@ -109,7 +109,8 @@ public function configurationProvider(): array 'MessageRetentionPeriod' => 345600, 'ReceiveMessageWaitTimeSeconds' => 20, 'VisibilityTimeout' => 30, - 'RedrivePolicy' => '' + 'RedrivePolicy' => '', + 'ContentBasedDeduplication' => false ] ] ] @@ -130,7 +131,8 @@ public function configurationProvider(): array 'redrive_policy' => [ 'dead_letter_queue' => 'basic_dead_letter_queue_1', 'max_receive_count' => 1 - ] + ], + 'content_based_deduplication' => true ] ] ], @@ -147,7 +149,8 @@ public function configurationProvider(): array 'RedrivePolicy' => json_encode([ 'deadLetterTargetArn' => 'basic_dead_letter_queue_1', 'maxReceiveCount' => 1 - ]) + ]), + 'ContentBasedDeduplication' => true ] ] ] @@ -168,7 +171,8 @@ public function configurationProvider(): array 'redrive_policy' => [ 'dead_letter_queue' => 'basic_dead_letter_queue_2', 'max_receive_count' => 2 - ] + ], + 'content_based_deduplication' => true ] ] ], @@ -185,7 +189,8 @@ public function configurationProvider(): array 'RedrivePolicy' => json_encode([ 'deadLetterTargetArn' => 'basic_dead_letter_queue_2', 'maxReceiveCount' => 2 - ]) + ]), + 'ContentBasedDeduplication' => true ] ] ] @@ -207,7 +212,8 @@ public function configurationProvider(): array 'MessageRetentionPeriod' => 345600, 'ReceiveMessageWaitTimeSeconds' => 20, 'VisibilityTimeout' => 30, - 'RedrivePolicy' => '' + 'RedrivePolicy' => '', + 'ContentBasedDeduplication' => false ] ], 'basic-queue-2' => [ @@ -219,7 +225,8 @@ public function configurationProvider(): array 'MessageRetentionPeriod' => 345600, 'ReceiveMessageWaitTimeSeconds' => 20, 'VisibilityTimeout' => 30, - 'RedrivePolicy' => '' + 'RedrivePolicy' => '', + 'ContentBasedDeduplication' => false ] ] ] From f9c68d8bcae26df8a551602a57e2a9e37a1688c6 Mon Sep 17 00:00:00 2001 From: Tri Tran Date: Wed, 9 May 2018 11:04:16 +0700 Subject: [PATCH 2/5] Support VisibilityTimeout per message --- Service/BaseQueue.php | 11 ++++++----- Tests/Unit/Service/BaseQueueTest.php | 6 +++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/Service/BaseQueue.php b/Service/BaseQueue.php index c345d36..402ed4e 100644 --- a/Service/BaseQueue.php +++ b/Service/BaseQueue.php @@ -79,10 +79,10 @@ public function ping() public function sendMessage(Message $message, int $delay = 0) { $params = [ - 'DelaySeconds' => $delay, - 'MessageAttributes' => $message->getAttributes(), + 'QueueUrl' => $this->queueUrl, 'MessageBody' => $message->getBody(), - 'QueueUrl' => $this->queueUrl + 'DelaySeconds' => $delay, + 'MessageAttributes' => $message->getAttributes() ]; try { $result = $this->client->sendMessage($params); @@ -107,10 +107,11 @@ public function receiveMessage(int $limit = 1) try { $result = $this->client->receiveMessage([ + 'QueueUrl' => $this->queueUrl, 'AttributeNames' => ['All'], - 'MaxNumberOfMessages' => $limit, 'MessageAttributeNames' => ['All'], - 'QueueUrl' => $this->queueUrl, + 'MaxNumberOfMessages' => $limit, + 'VisibilityTimeout' => $this->attributes['VisibilityTimeout'] ?? 30, 'WaitTimeSeconds' => $this->attributes['ReceiveMessageWaitTimeSeconds'] ?? 0, ]); diff --git a/Tests/Unit/Service/BaseQueueTest.php b/Tests/Unit/Service/BaseQueueTest.php index b4c7f0b..f81ccb8 100644 --- a/Tests/Unit/Service/BaseQueueTest.php +++ b/Tests/Unit/Service/BaseQueueTest.php @@ -157,7 +157,10 @@ public function testReceiveMessage() { $limit = random_int(1, 10); $queueUrl = 'queue-url'; - $queueAttr = ['ReceiveMessageWaitTimeSeconds' => random_int(1, 10)]; + $queueAttr = [ + 'ReceiveMessageWaitTimeSeconds' => random_int(1, 10), + 'VisibilityTimeout' => random_int(0, 30) + ]; $expected = [ [ 'MessageId' => 'my-message-id', @@ -176,6 +179,7 @@ public function testReceiveMessage() 'MessageAttributeNames' => ['All'], 'QueueUrl' => $queueUrl, 'WaitTimeSeconds' => $queueAttr['ReceiveMessageWaitTimeSeconds'], + 'VisibilityTimeout' => $queueAttr['VisibilityTimeout'], ]) ->willReturn($this->getAwsResult(['Messages' => $expected])); From 5be81bfe8c4c507a75b2f67dfb3ca33b8d797d58 Mon Sep 17 00:00:00 2001 From: Tri Tran Date: Wed, 9 May 2018 12:19:09 +0700 Subject: [PATCH 3/5] Support MessageGroupID and MessageDeduplicationID in sending message to a FIFO queue --- Service/BaseQueue.php | 36 ++++++++++++-- Service/Message.php | 70 ++++++++++++++++++++++++--- Tests/Unit/Service/BaseQueueTest.php | 72 ++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 10 deletions(-) diff --git a/Service/BaseQueue.php b/Service/BaseQueue.php index 402ed4e..1398e94 100644 --- a/Service/BaseQueue.php +++ b/Service/BaseQueue.php @@ -44,20 +44,20 @@ class BaseQueue * @param string $queueName * @param string $queueUrl * @param AbstractWorker $queueWorker - * @param array $options + * @param array $attributes */ public function __construct( SqsClient $client, string $queueName, string $queueUrl, AbstractWorker $queueWorker, - array $options = [] + array $attributes = [] ) { $this->client = $client; $this->queueUrl = $queueUrl; $this->queueName = $queueName; $this->queueWorker = $queueWorker; - $this->attributes = $options; + $this->attributes = $attributes; } /** @@ -81,9 +81,29 @@ public function sendMessage(Message $message, int $delay = 0) $params = [ 'QueueUrl' => $this->queueUrl, 'MessageBody' => $message->getBody(), - 'DelaySeconds' => $delay, 'MessageAttributes' => $message->getAttributes() ]; + + if ($this->isFIFO()) { + if ($delay) { + trigger_error('FIFO queues don\'t support per-message delays, only per-queue delays.', E_USER_WARNING); + $delay = 0; + } + + if (empty($message->getGroupId())) { + throw new \InvalidArgumentException('MessageGroupId is required for FIFO queues.'); + } + $params['MessageGroupId'] = $message->getGroupId(); + + if (!empty($message->getDeduplicationId())) { + $params['MessageDeduplicationId'] = $message->getDeduplicationId(); + } + } + + if ($delay) { + $params['DelaySeconds'] = $delay; + } + try { $result = $this->client->sendMessage($params); $messageId = $result->get('MessageId'); @@ -257,4 +277,12 @@ public function getClient(): SqsClient { return $this->client; } + + /** + * @return bool + */ + final public function isFIFO(): bool + { + return '.fifo' === substr($this->getQueueName(), -5); + } } diff --git a/Service/Message.php b/Service/Message.php index 7145e5d..b8bd7df 100644 --- a/Service/Message.php +++ b/Service/Message.php @@ -23,6 +23,16 @@ class Message */ private $attributes; + /** + * @var string + */ + private $groupId; + + /** + * @var string + */ + private $deduplicationId; + /** * @var string */ @@ -33,11 +43,19 @@ class Message * * @param string $body * @param array $attributes + * @param string $groupId + * @param string $deduplicationId */ - public function __construct(string $body = '', array $attributes = []) - { + public function __construct( + string $body = '', + array $attributes = [], + string $groupId = '', + string $deduplicationId = '' + ) { $this->body = $body; $this->attributes = $attributes; + $this->groupId = $groupId; + $this->deduplicationId = $deduplicationId; } /** @@ -51,7 +69,7 @@ public function getId(): string /** * @param string $id * - * @return $this + * @return Message */ public function setId(string $id) { @@ -71,7 +89,7 @@ public function getBody(): string /** * @param string $body * - * @return $this + * @return Message */ public function setBody(string $body) { @@ -91,7 +109,7 @@ public function getAttributes(): array /** * @param array $attributes * - * @return $this + * @return Message */ public function setAttributes(array $attributes) { @@ -100,6 +118,46 @@ public function setAttributes(array $attributes) return $this; } + /** + * @return string + */ + public function getGroupId(): string + { + return $this->groupId; + } + + /** + * @param string $groupId + * + * @return Message + */ + public function setGroupId(string $groupId) + { + $this->groupId = $groupId; + + return $this; + } + + /** + * @return string + */ + public function getDeduplicationId(): string + { + return $this->deduplicationId; + } + + /** + * @param string $deduplicationId + * + * @return Message + */ + public function setDeduplicationId(string $deduplicationId) + { + $this->deduplicationId = $deduplicationId; + + return $this; + } + /** * @return string */ @@ -111,7 +169,7 @@ public function getReceiptHandle(): string /** * @param string $receiptHandle * - * @return $this + * @return Message */ public function setReceiptHandle(string $receiptHandle) { diff --git a/Tests/Unit/Service/BaseQueueTest.php b/Tests/Unit/Service/BaseQueueTest.php index f81ccb8..28f27a5 100644 --- a/Tests/Unit/Service/BaseQueueTest.php +++ b/Tests/Unit/Service/BaseQueueTest.php @@ -6,6 +6,7 @@ use Aws\Exception\AwsException; use Aws\Result; use Aws\Sqs\SqsClient; +use PHPUnit\Framework\Error as PHPUnitError; use PHPUnit\Framework\TestCase; use TriTran\SqsQueueBundle\Service\BaseQueue; use TriTran\SqsQueueBundle\Service\Message; @@ -130,6 +131,36 @@ public function testSendMessage() ); } + /** + * Test: send message to a FIFO queue + */ + public function testSendMessageToFifoQueue() + { + $messageBody = 'my-message'; + $messageAttr = ['x', 'y', 'z']; + $queueUrl = 'queue-url'; + $groupId = 'group-name'; + $deduplicationId = 'deduplication-id'; + + $client = $this->getAwsClient(); + $client->expects($this->any()) + ->method('sendMessage') + ->with([ + 'MessageAttributes' => $messageAttr, + 'MessageBody' => $messageBody, + 'QueueUrl' => $queueUrl, + 'MessageGroupId' => $groupId, + 'MessageDeduplicationId' => $deduplicationId, + ]) + ->willReturn($this->getAwsResult(['MessageId' => 'new-message-id'])); + + $queue = new BaseQueue($client, 'queue-name.fifo', $queueUrl, new BasicWorker(), []); + $this->assertEquals( + 'new-message-id', + $queue->sendMessage(new Message($messageBody, $messageAttr, $groupId, $deduplicationId)) + ); + } + /** * Test: send message to a queue in failure */ @@ -150,6 +181,47 @@ public function testSendMessageFailure() $queue->sendMessage(new Message('my-message', [])); } + /** + * Test: send message to a FIFO queue + */ + public function testSendMessageToFifoQueueFailure() + { + $client = $this->getAwsClient(); + $client->expects($this->any()) + ->method('sendMessage') + ->withAnyParameters() + ->willThrowException(new AwsException( + 'AWS Client Exception', + new Command('send-message-command') + )); + + $queue = new BaseQueue($client, 'queue-name.fifo', 'queue-url', new BasicWorker(), []); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('MessageGroupId is required for FIFO queues.'); + $queue->sendMessage(new Message('my-message', [], '')); + } + + /** + * Test: send message to a FIFO queue + */ + public function testSendMessageToFifoQueueWarning() + { + $client = $this->getAwsClient(); + $client->expects($this->any()) + ->method('sendMessage') + ->withAnyParameters() + ->willThrowException(new AwsException( + 'AWS Client Exception', + new Command('send-message-command') + )); + + $queue = new BaseQueue($client, 'queue-name.fifo', 'queue-url', new BasicWorker(), []); + + $this->expectException(PHPUnitError\Warning::class); + $queue->sendMessage(new Message('my-message', [], ''), random_int(0, 10)); + } + /** * Test: receive Message */ From 34e674dbe1e8b3f73ac6845b792b3fa8648e59c3 Mon Sep 17 00:00:00 2001 From: Tri Tran Date: Wed, 9 May 2018 12:32:31 +0700 Subject: [PATCH 4/5] Update README --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 3cb7fe8..7c7d6a7 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,10 @@ class DefaultController extends Controller } ``` +> For a FIFO queue, you must associate a non-empty `MessageGroupId` with a message. Otherwise, the action fails.
+> You may provide a `MessageDeduplicationId` explicitly. If you aren't able to provide a `MessageDeduplicationId` and you enable `ContentBasedDeduplication` for your queue, Amazon SQS uses a SHA-256 hash to generate the `MessageDeduplicationId` using the body of the message (but not the attributes of the message).
+> For more information about FIFO queue, please take a look at [Amazon SQS FIFO (First-In-First-Out) Queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) + #### Queue Behaviours |Behaviour|Arguments|Description| From 29624dad8afef327921d7c65741a0b5d67f1cd56 Mon Sep 17 00:00:00 2001 From: Tri Tran Date: Wed, 9 May 2018 12:44:29 +0700 Subject: [PATCH 5/5] Add more Test cases --- Tests/Unit/Service/MessageTest.php | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/Tests/Unit/Service/MessageTest.php b/Tests/Unit/Service/MessageTest.php index 9ec1253..1db0918 100644 --- a/Tests/Unit/Service/MessageTest.php +++ b/Tests/Unit/Service/MessageTest.php @@ -18,11 +18,15 @@ public function testConstruction() { $body = 'my-body'; $attributes = ['a', 'b', 'c']; - $message = new Message($body, $attributes); + $groupId = 'group-id'; + $deduplicationID = 'deduplication-id'; + $message = new Message($body, $attributes, $groupId, $deduplicationID); $this->assertInstanceOf(Message::class, $message); $this->assertEquals($body, $message->getBody()); $this->assertEquals($attributes, $message->getAttributes()); + $this->assertEquals($groupId, $message->getGroupId()); + $this->assertEquals($deduplicationID, $message->getDeduplicationId()); } /** @@ -30,7 +34,7 @@ public function testConstruction() */ public function testGetterSetter() { - $message = new Message('', []); + $message = new Message('', [], '', ''); $this->assertInstanceOf(Message::class, $message->setId('my-id')); $this->assertEquals('my-id', $message->getId()); @@ -40,5 +44,9 @@ public function testGetterSetter() $this->assertEquals(['a', 'b', 'c'], $message->getAttributes()); $this->assertInstanceOf(Message::class, $message->setReceiptHandle('my-receipt')); $this->assertEquals('my-receipt', $message->getReceiptHandle()); + $this->assertInstanceOf(Message::class, $message->setGroupId('group-id')); + $this->assertEquals('group-id', $message->getGroupId()); + $this->assertInstanceOf(Message::class, $message->setDeduplicationId('deduplication-id')); + $this->assertEquals('deduplication-id', $message->getDeduplicationId()); } }