-
Notifications
You must be signed in to change notification settings - Fork 0
Queue operation options
One problem with queues is about managing little differences. For example AWS SQS can use a delay
on a send
operation and another adapter probably doesn't have that option or use a different option key system.
The library approach is very simple. Options are passed directly to the adapter:
$queue->send("my message", [
"DelaySeconds" => 20, // valid for AWS SQS
]);
$queue->receive([
"WaitTimeSeconds" => 15, // valid for AWS SQS
]);
This means that if you want to manage different adapters with different options you have to create an adaptable layer also for those configurations. For example with functions
$queue->send("my_message", queue_options([
"delay" => 20,
]);
$queue->receive(queue_options([
"timeout" => 15*60
]);
Where the queue_options
function could be something like:
function queue_options(array $options = []) {
$opts = [];
if (array_key_exists("timeout", $options) {
$opts = array_merge($opts, ["VisibilityTimeout" => $options["timeout"]);
}
if (array_key_exists("delay", $options) {
$opts = array_merge($opts, ["DelaySeconds" => $options["delay"]);
}
return $opts;
}
queue_options(["delay" => 5]); // ["DelaySeconds" => 5]
Of course with this example if you change your queue adapter you have also to change the queue_options
function to convert different options into the right form. But with dependency injection you can use invokables
class SQSQueueOptionAdapter
{
function __invoke(array $options = []) {
$opts = [];
if (array_key_exists("delay", $options) {
$opts = array_merge($opts, ["DelaySeconds" => $options["delay"]);
}
return $opts;
}
}
class RabbitMQQueueOptionAdapter
{
function __invoke(array $options = []) {
$opts = [];
if (array_key_exists("delay", $options) {
$opts = array_merge($opts, ["x-delayed-message" => $options["delay"]);
}
return $opts;
}
}
In that way you can swap the option adapter
in your DI
configuration and you use those objects as functions
$queueOptionsAdapter = $this->container->get("queue.options.adapter");
$queue->send("my_message", $queueOptionsAdapter([
"delay" => 20,
]);
This project is sponsored by Corley SRL