Skip to content

Queue operation options

Walter Dal Mut edited this page Jan 21, 2017 · 1 revision

One problem with queues is about managing little differences. For example AWS SQS can use a delay on a send operation and another adapter probably doesn't have that option or use a different option key system.

The library approach is very simple. Options are passed directly to the adapter:

$queue->send("my message", [
  "DelaySeconds" => 20, // valid for AWS SQS
]);

$queue->receive([
  "WaitTimeSeconds" => 15, // valid for AWS SQS
]);

This means that if you want to manage different adapters with different options you have to create an adaptable layer also for those configurations. For example with functions

$queue->send("my_message", queue_options([
  "delay" => 20,
]);

$queue->receive(queue_options([
  "timeout" => 15*60
]);

Where the queue_options function could be something like:

function queue_options(array $options = []) {
  $opts = [];

  if (array_key_exists("timeout", $options) {
    $opts = array_merge($opts, ["VisibilityTimeout" => $options["timeout"]);
  }
  if (array_key_exists("delay", $options) {
    $opts = array_merge($opts, ["DelaySeconds" => $options["delay"]);
  }

  return $opts;
}

queue_options(["delay" => 5]); // ["DelaySeconds" => 5]

Of course with this example if you change your queue adapter you have also to change the queue_options function to convert different options into the right form. But with dependency injection you can use invokables

AWS Options adapter

class SQSQueueOptionAdapter
{
  function __invoke(array $options = []) {
    $opts = [];

    if (array_key_exists("delay", $options) {
      $opts = array_merge($opts, ["DelaySeconds" => $options["delay"]);
    }

    return $opts;
  }
}

RabbitMQ options adapter

class RabbitMQQueueOptionAdapter
{
  function __invoke(array $options = []) {
    $opts = [];

    if (array_key_exists("delay", $options) {
      $opts = array_merge($opts, ["x-delayed-message" => $options["delay"]);
    }

    return $opts;
  }
}

In that way you can swap the option adapter in your DI configuration and you use those objects as functions

$queueOptionsAdapter = $this->container->get("queue.options.adapter");

$queue->send("my_message", $queueOptionsAdapter([
  "delay" => 20,
]);