Skip to content

Commit

Permalink
Merge pull request #32 from trandangtri/feature/issue-30
Browse files Browse the repository at this point in the history
Delivers a message to a FIFO queue.
  • Loading branch information
trandangtri authored May 9, 2018
2 parents 9c97a2d + 29624da commit 4714017
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 28 deletions.
9 changes: 8 additions & 1 deletion Command/QueueCreateCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ protected function configure()
'VisibilityTimeout',
30
)
->addOption(
'content_based_deduplication',
null,
InputOption::VALUE_NONE,
'ContentBasedDeduplication'
)
->setDescription('Create a queue by name and basic attributions');
}

Expand All @@ -86,7 +92,8 @@ protected function execute(InputInterface $input, OutputInterface $output)
'MaximumMessageSize' => $input->getOption('maximum_message_size'),
'MessageRetentionPeriod' => $input->getOption('message_retention_period'),
'ReceiveMessageWaitTimeSeconds' => $input->getOption('receive_message_wait_time_seconds'),
'VisibilityTimeout' => $input->getOption('visibility_timeout')
'VisibilityTimeout' => $input->getOption('visibility_timeout'),
'ContentBasedDeduplication' => $input->getOption('content_based_deduplication')
]);

$io->text(sprintf('Created successfully. New Queue URL: <comment>%s</comment>', $queueUrl));
Expand Down
4 changes: 3 additions & 1 deletion DependencyInjection/Compiler/SQSQueuePass.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public function process(ContainerBuilder $container)
$queueOption['attributes']['redrive_policy']['dead_letter_queue'] ?? '',
'maxReceiveCount' =>
$queueOption['attributes']['redrive_policy']['max_receive_count'] ?? 5,
]) : ''
]) : '',
'ContentBasedDeduplication' =>
$queueOption['attributes']['content_based_deduplication'] ?? false
]
]);

Expand Down
2 changes: 2 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace TriTran\SqsQueueBundle\DependencyInjection;

use Symfony\Component\Config\Definition\Builder\BooleanNodeDefinition;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use Symfony\Component\Config\Definition\ConfigurationInterface;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
Expand Down Expand Up @@ -65,6 +66,7 @@ protected function getSQSQueueAttributesNodeDef()
->append((new IntegerNodeDefinition('message_retention_period'))->defaultValue(345600)->min(60)->max(1209600))// 4 days
->append((new IntegerNodeDefinition('receive_message_wait_time_seconds'))->defaultValue(20)->min(0)->max(20))// seconds
->append((new IntegerNodeDefinition('visibility_timeout'))->defaultValue(30)->min(0)->max(43200))// second
->append((new BooleanNodeDefinition('content_based_deduplication'))->defaultValue(false))// second
->append($this->getSQSRedrivePolicyNode())
->end();
}
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class DefaultController extends Controller
}
```

> For a FIFO queue, you must associate a non-empty `MessageGroupId` with a message. Otherwise, the action fails.<br/>
> You may provide a `MessageDeduplicationId` explicitly. If you aren't able to provide a `MessageDeduplicationId` and you enable `ContentBasedDeduplication` for your queue, Amazon SQS uses a SHA-256 hash to generate the `MessageDeduplicationId` using the body of the message (but not the attributes of the message).<br />
> For more information about FIFO queue, please take a look at [Amazon SQS FIFO (First-In-First-Out) Queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html)

#### Queue Behaviours

|Behaviour|Arguments|Description|
Expand Down
45 changes: 37 additions & 8 deletions Service/BaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ class BaseQueue
* @param string $queueName
* @param string $queueUrl
* @param AbstractWorker $queueWorker
* @param array $options
* @param array $attributes
*/
public function __construct(
SqsClient $client,
string $queueName,
string $queueUrl,
AbstractWorker $queueWorker,
array $options = []
array $attributes = []
) {
$this->client = $client;
$this->queueUrl = $queueUrl;
$this->queueName = $queueName;
$this->queueWorker = $queueWorker;
$this->attributes = $options;
$this->attributes = $attributes;
}

/**
Expand All @@ -79,11 +79,31 @@ public function ping()
public function sendMessage(Message $message, int $delay = 0)
{
$params = [
'DelaySeconds' => $delay,
'MessageAttributes' => $message->getAttributes(),
'QueueUrl' => $this->queueUrl,
'MessageBody' => $message->getBody(),
'QueueUrl' => $this->queueUrl
'MessageAttributes' => $message->getAttributes()
];

if ($this->isFIFO()) {
if ($delay) {
trigger_error('FIFO queues don\'t support per-message delays, only per-queue delays.', E_USER_WARNING);
$delay = 0;
}

if (empty($message->getGroupId())) {
throw new \InvalidArgumentException('MessageGroupId is required for FIFO queues.');
}
$params['MessageGroupId'] = $message->getGroupId();

if (!empty($message->getDeduplicationId())) {
$params['MessageDeduplicationId'] = $message->getDeduplicationId();
}
}

if ($delay) {
$params['DelaySeconds'] = $delay;
}

try {
$result = $this->client->sendMessage($params);
$messageId = $result->get('MessageId');
Expand All @@ -107,10 +127,11 @@ public function receiveMessage(int $limit = 1)

try {
$result = $this->client->receiveMessage([
'QueueUrl' => $this->queueUrl,
'AttributeNames' => ['All'],
'MaxNumberOfMessages' => $limit,
'MessageAttributeNames' => ['All'],
'QueueUrl' => $this->queueUrl,
'MaxNumberOfMessages' => $limit,
'VisibilityTimeout' => $this->attributes['VisibilityTimeout'] ?? 30,
'WaitTimeSeconds' => $this->attributes['ReceiveMessageWaitTimeSeconds'] ?? 0,
]);

Expand Down Expand Up @@ -256,4 +277,12 @@ public function getClient(): SqsClient
{
return $this->client;
}

/**
* @return bool
*/
final public function isFIFO(): bool
{
return '.fifo' === substr($this->getQueueName(), -5);
}
}
70 changes: 64 additions & 6 deletions Service/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ class Message
*/
private $attributes;

/**
* @var string
*/
private $groupId;

/**
* @var string
*/
private $deduplicationId;

/**
* @var string
*/
Expand All @@ -33,11 +43,19 @@ class Message
*
* @param string $body
* @param array $attributes
* @param string $groupId
* @param string $deduplicationId
*/
public function __construct(string $body = '', array $attributes = [])
{
public function __construct(
string $body = '',
array $attributes = [],
string $groupId = '',
string $deduplicationId = ''
) {
$this->body = $body;
$this->attributes = $attributes;
$this->groupId = $groupId;
$this->deduplicationId = $deduplicationId;
}

/**
Expand All @@ -51,7 +69,7 @@ public function getId(): string
/**
* @param string $id
*
* @return $this
* @return Message
*/
public function setId(string $id)
{
Expand All @@ -71,7 +89,7 @@ public function getBody(): string
/**
* @param string $body
*
* @return $this
* @return Message
*/
public function setBody(string $body)
{
Expand All @@ -91,7 +109,7 @@ public function getAttributes(): array
/**
* @param array $attributes
*
* @return $this
* @return Message
*/
public function setAttributes(array $attributes)
{
Expand All @@ -100,6 +118,46 @@ public function setAttributes(array $attributes)
return $this;
}

/**
* @return string
*/
public function getGroupId(): string
{
return $this->groupId;
}

/**
* @param string $groupId
*
* @return Message
*/
public function setGroupId(string $groupId)
{
$this->groupId = $groupId;

return $this;
}

/**
* @return string
*/
public function getDeduplicationId(): string
{
return $this->deduplicationId;
}

/**
* @param string $deduplicationId
*
* @return Message
*/
public function setDeduplicationId(string $deduplicationId)
{
$this->deduplicationId = $deduplicationId;

return $this;
}

/**
* @return string
*/
Expand All @@ -111,7 +169,7 @@ public function getReceiptHandle(): string
/**
* @param string $receiptHandle
*
* @return $this
* @return Message
*/
public function setReceiptHandle(string $receiptHandle)
{
Expand Down
3 changes: 2 additions & 1 deletion Service/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class QueueManager
'MessageRetentionPeriod' => 345600, // 4 days
'ReceiveMessageWaitTimeSeconds' => 0,
'VisibilityTimeout' => 30,
'RedrivePolicy' => ''
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
];

/**
Expand Down
4 changes: 3 additions & 1 deletion Tests/Functional/Command/QueueCreateCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public function setUp()
'MaximumMessageSize' => 1,
'MessageRetentionPeriod' => 1,
'ReceiveMessageWaitTimeSeconds' => 1,
'VisibilityTimeout' => 1
'VisibilityTimeout' => 1,
'ContentBasedDeduplication' => true,
])
->willReturn('new-queue-url');

Expand All @@ -55,6 +56,7 @@ public function testExecute()
'--message_retention_period' => 1,
'--receive_message_wait_time_seconds' => 1,
'--visibility_timeout' => 1,
'--content_based_deduplication' => true,
]);

$output = $commandTester->getDisplay();
Expand Down
21 changes: 14 additions & 7 deletions Tests/Unit/DependencyInjection/Compiler/SQSQueuePassTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public function configurationProvider(): array
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => ''
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
]
]
]
Expand All @@ -130,7 +131,8 @@ public function configurationProvider(): array
'redrive_policy' => [
'dead_letter_queue' => 'basic_dead_letter_queue_1',
'max_receive_count' => 1
]
],
'content_based_deduplication' => true
]
]
],
Expand All @@ -147,7 +149,8 @@ public function configurationProvider(): array
'RedrivePolicy' => json_encode([
'deadLetterTargetArn' => 'basic_dead_letter_queue_1',
'maxReceiveCount' => 1
])
]),
'ContentBasedDeduplication' => true
]
]
]
Expand All @@ -168,7 +171,8 @@ public function configurationProvider(): array
'redrive_policy' => [
'dead_letter_queue' => 'basic_dead_letter_queue_2',
'max_receive_count' => 2
]
],
'content_based_deduplication' => true
]
]
],
Expand All @@ -185,7 +189,8 @@ public function configurationProvider(): array
'RedrivePolicy' => json_encode([
'deadLetterTargetArn' => 'basic_dead_letter_queue_2',
'maxReceiveCount' => 2
])
]),
'ContentBasedDeduplication' => true
]
]
]
Expand All @@ -207,7 +212,8 @@ public function configurationProvider(): array
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => ''
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
]
],
'basic-queue-2' => [
Expand All @@ -219,7 +225,8 @@ public function configurationProvider(): array
'MessageRetentionPeriod' => 345600,
'ReceiveMessageWaitTimeSeconds' => 20,
'VisibilityTimeout' => 30,
'RedrivePolicy' => ''
'RedrivePolicy' => '',
'ContentBasedDeduplication' => false
]
]
]
Expand Down
Loading

0 comments on commit 4714017

Please sign in to comment.