Skip to content

Commit

Permalink
[DEVEX-2399]: Make errored queues configurable (#213)
Browse files Browse the repository at this point in the history
* docs: add typespecs to helper module

* feat: add `queue_opts` in setup_dead_lettering/2

* feat!: propagate queue type to dead letter queue

BREAKING CHANGE: if `Helper.declare` is used with a non-default
queue type this will try to change the dead letter queue type
causing an error

* test: x-queue-type is correctly set on dead letter queue

* fix!: avoid nested lists in `original_routing_keys`

* chore: bump to 7.0.0
  • Loading branch information
emilianobovetti authored Jan 27, 2025
1 parent f4f675a commit 0d2672c
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 17 deletions.
61 changes: 59 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,63 @@ and this project adheres to

---

## [7.0.0] - 2025-01-24

### Changed

- When you pass an `x-queue-type` to `Amqpx.Helper.declare/2` now it will be used also for the dead letter queue

This is a breaking change: if you are already using `Helper.declare` with an `x-queue-type` that is not the default type this will try to change the dead letter queue type.

In this case you can either remove the dead letter queue and recreate it with the correct type, or you can migrate to a new dead letter queue with a different name and remove the old one when it's fully drained:

```elixir
queue_spec = %{
queue: "test_1",
opts: [
durable: true,
arguments: [
{"x-queue-type", :longstr, "quorum"},
{"x-dead-letter-exchange", :longstr, "test_dle"},
{"x-dead-letter-routing-key", :longstr, "test_rk"}
]
],
exchanges: [
%{name: "test_exchange", type: :topic, routing_keys: ["test_rk"], opts: [durable: true]},
]
}

# As an example we'll take the following `Amqpx.Helper.declare`
# that creates a queue called `test_1` and a corresponding `test_1_errored`
:ok = Amqpx.Helper.declare(chan, queue_spec)

# Amqpx.Helper.setup_queue/2 takes the exact same queue_spec
# as declare/2 but it doesn't declare the dead letter queue
:ok = Amqpx.Helper.setup_queue(chan, queue_spec)

# Now we can create a new dead letter queue with type "quorum"
# by using a different name, we just need to make sure
# its routing key will match the `x-dead-letter-routing-key` argument
:ok = Amqpx.Helper.setup_dead_lettering(chan, %{
routing_key: "test_rk",
queue: "test_1_dlq",
exchange: "test_dle",
queue_opts: [durable: true, arguments: [{"x-queue-type", :longstr, "quorum"}]]
})

# At this point dead-lettered messages should be delivered to both
# `test_1_errored` and `test_1_dlq`, in this way we can migrate everything
# to the new one and as soon as it empties we can remove the old one
```

- The `original_routing_keys` option accepted by `Amqpx.Helper.setup_dead_lettering/2` must be a `[String.t()]`, if you are passing a `[[String.t()]]` to this function you have to pipe trough `List.flatten` now

### Added

- `Amqpx.Helper.setup_dead_lettering/2` now accepts a `queue_opts` key which will be used as third argument for `Amqpx.Queue.declare/3`

---

## [6.1.3] - 2025-01-23

### Fixed
Expand Down Expand Up @@ -113,8 +170,8 @@ This is due to elixir rabbit not supporting the older versions of the libraries
- ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX
queues instead of wildcard


[Unreleased]: https://github.com/primait/amqpx/compare/6.1.3...HEAD
[Unreleased]: https://github.com/primait/amqpx/compare/7.0.0...HEAD
[7.0.0]: https://github.com/primait/amqpx/compare/6.1.3...7.0.0
[6.1.3]: https://github.com/primait/amqpx/compare/6.1.2...6.1.3
[6.1.2]: https://github.com/primait/amqpx/compare/6.1.1...6.1.2
[6.1.1]: https://github.com/primait/amqpx/compare/6.1.0...6.1.1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rest!
```elixir
def deps do
[
{:amqpx, "~> 6.1.3"}
{:amqpx, "~> 7.0.0"}
]
end
```
Expand Down
83 changes: 70 additions & 13 deletions lib/amqp/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,51 @@ defmodule Amqpx.Helper do

require Logger

alias Amqpx.{Exchange, Queue}

alias Amqpx.{Basic, Channel, Exchange, Queue}

@dead_letter_queue_defaults [durable: true]

# Supervisor.module_spec() has been introduced with elixir 1.16
# we can remove this when we update the minimum supported version
@type module_spec :: {module, arg :: any}

@type exchange_spec :: %{
name: Basic.exchange(),
type: atom,
routing_keys: [String.t()],
opts: Keyword.t()
}

@type queue_spec :: %{
:queue => Basic.queue(),
:exchanges => [exchange_spec],
optional(:opts) => Keyword.t()
}

@type dead_letter_queue_spec :: %{
:queue => Basic.queue(),
:exchange => Basic.exchange(),
:routing_key => String.t(),
optional(:original_routing_keys) => [String.t()],
optional(:queue_opts) => Keyword.t()
}

@spec manager_supervisor_configuration(Keyword.t()) :: module_spec
def manager_supervisor_configuration(config) do
{Amqpx.Gen.ConnectionManager, %{connection_params: encrypt_password(config)}}
end

@spec consumers_supervisor_configuration([handler_conf :: map]) :: [Supervisor.child_spec()]
def consumers_supervisor_configuration(handlers_conf) do
Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1()))
end

@spec producer_supervisor_configuration(producer_conf :: map) :: module_spec
def producer_supervisor_configuration(producer_conf) do
{Amqpx.Gen.Producer, producer_conf}
end

@spec encrypt_password(Keyword.t()) :: Keyword.t()
def encrypt_password(config) do
case Keyword.get(config, :obfuscate_password, true) do
true ->
Expand All @@ -29,6 +60,7 @@ defmodule Amqpx.Helper do
end
end

@spec get_password(Keyword.t(), Keyword.t() | nil) :: Keyword.value()
def get_password(config, nil) do
case Keyword.get(config, :obfuscate_password, true) do
true ->
Expand All @@ -49,6 +81,7 @@ defmodule Amqpx.Helper do
end
end

@spec declare(Channel.t(), queue_spec) :: :ok | no_return
def declare(
channel,
%{
Expand All @@ -57,22 +90,25 @@ defmodule Amqpx.Helper do
exchanges: exchanges
} = queue
) do
case Enum.find(opts[:arguments], &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do
arguments = Keyword.get(opts, :arguments, [])

case Enum.find(arguments, &match?({"x-dead-letter-exchange", :longstr, _}, &1)) do
{_, _, dle} ->
{dlr_config_key, dlr_config_value} =
case Enum.find(opts[:arguments], &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do
case Enum.find(arguments, &match?({"x-dead-letter-routing-key", :longstr, _}, &1)) do
{_, _, dlrk} ->
{:routing_key, dlrk}

nil ->
original_routing_keys = Enum.map(exchanges, & &1.routing_keys)
original_routing_keys = Enum.flat_map(exchanges, & &1.routing_keys)
{:original_routing_keys, original_routing_keys}
end

setup_dead_lettering(channel, %{
dlr_config_key => dlr_config_value,
queue: "#{qname}_errored",
exchange: dle
exchange: dle,
queue_opts: set_dead_letter_queue_type(@dead_letter_queue_defaults, arguments)
})

nil ->
Expand All @@ -86,10 +122,11 @@ defmodule Amqpx.Helper do
setup_queue(channel, queue)
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq}) do
@spec setup_dead_lettering(Channel.t(), dead_letter_queue_spec) :: :ok | {:ok, map} | Basic.error()
def setup_dead_lettering(channel, %{queue: dlq, exchange: "", routing_key: dlq} = spec) do
# DLX will work through [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default)
# since `x-dead-letter-routing-key` matches the queue name
Queue.declare(channel, dlq, durable: true)
Queue.declare(channel, dlq, dead_letter_queue_opts(spec))
end

def setup_dead_lettering(_channel, %{queue: dlq, exchange: "", routing_key: bad_dlq}) do
Expand All @@ -103,24 +140,27 @@ defmodule Amqpx.Helper do
end
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key}) do
def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, routing_key: routing_key} = spec) do
Exchange.declare(channel, exchange, :topic, durable: true)
Queue.declare(channel, dlq, durable: true)
Queue.declare(channel, dlq, dead_letter_queue_opts(spec))
Queue.bind(channel, dlq, exchange, routing_key: routing_key)
end

def setup_dead_lettering(channel, %{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys}) do
def setup_dead_lettering(
channel,
%{queue: dlq, exchange: exchange, original_routing_keys: original_routing_keys} = spec
) do
Exchange.declare(channel, exchange, :topic, durable: true)
Queue.declare(channel, dlq, durable: true)
Queue.declare(channel, dlq, dead_letter_queue_opts(spec))

original_routing_keys
|> List.flatten()
|> Enum.uniq()
|> Enum.each(fn rk ->
:ok = Queue.bind(channel, dlq, exchange, routing_key: rk)
end)
end

@spec setup_queue(Channel.t(), queue_spec) :: :ok | no_return
def setup_queue(channel, %{
queue: queue,
exchanges: exchanges,
Expand All @@ -140,6 +180,7 @@ defmodule Amqpx.Helper do
Enum.each(exchanges, &setup_exchange(channel, queue, &1))
end

@spec setup_exchange(Channel.t(), Basic.queue(), exchange_spec) :: :ok | Basic.error() | no_return
def setup_exchange(channel, queue, %{
name: name,
type: type,
Expand Down Expand Up @@ -189,6 +230,22 @@ defmodule Amqpx.Helper do
Exchange.declare(channel, name, type)
end

@spec dead_letter_queue_opts(dead_letter_queue_spec) :: Keyword.t()
defp dead_letter_queue_opts(spec) do
Map.get(spec, :queue_opts, @dead_letter_queue_defaults)
end

@spec set_dead_letter_queue_type(Keyword.t(), [{String.t(), atom, any}]) :: Keyword.t()
defp set_dead_letter_queue_type(dlq_opts, queue_args) do
case Enum.find(queue_args, &match?({"x-queue-type", :longstr, _}, &1)) do
nil ->
dlq_opts

queue_type ->
Keyword.update(dlq_opts, :arguments, [queue_type], &[queue_type | &1])
end
end

defp skip_dead_letter_routing_key_check_for,
do: Application.get_env(:amqpx, :skip_dead_letter_routing_key_check_for, [])
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Amqpx.MixProject do
use Mix.Project

@version "6.1.3"
@version "7.0.0"

def project do
[
Expand Down
40 changes: 40 additions & 0 deletions test/helper_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,46 @@ defmodule HelperTest do
Application.put_env(:amqpx, :skip_dead_letter_routing_key_check_for, [])
end

test "declare/2 propagates x-queue-type to dead letter queue declaration",
meta do
queue_name = rand_name()
routing_key_name = rand_name()
exchange_name = rand_name()
dead_letter_queue = "#{queue_name}_errored"

assert :ok ==
Helper.declare(meta[:chan], %{
exchanges: [
%{name: exchange_name, opts: [durable: true], routing_keys: [routing_key_name], type: :topic}
],
opts: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, dead_letter_queue},
{"x-queue-type", :longstr, "quorum"}
]
],
queue: queue_name
})

rabbit_manager = Application.get_env(:amqpx, :rabbit_manager_url).rabbit
amqp_conn = Application.get_env(:amqpx, :amqp_connection)
credentials = Base.encode64("#{amqp_conn[:username]}:#{amqp_conn[:password]}")
headers = [{~c"Authorization", "Basic #{credentials}"}]

assert {:ok, {{_, 200, ~c"OK"}, _headers, body}} =
:httpc.request(:get, {"http://#{rabbit_manager}/api/queues", headers}, [], [])

assert {:ok, queues} = Jason.decode(body)

assert %{"durable" => true, "arguments" => %{"x-queue-type" => "quorum"}} =
Enum.find(queues, fn q -> match?(%{"name" => ^queue_name}, q) end)

assert %{"durable" => true, "arguments" => %{"x-queue-type" => "quorum"}} =
Enum.find(queues, fn q -> match?(%{"name" => ^dead_letter_queue}, q) end)
end

defp rand_name do
:crypto.strong_rand_bytes(8) |> Base.encode64()
end
Expand Down

0 comments on commit 0d2672c

Please sign in to comment.