Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-2274]: Investigate possible DNS caching issues on our rabbitmq libraries #199

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
erlang 23.1
elixir 1.11.2-otp-23
erlang 25.3.2.8
elixir 1.13.4-otp-25
113 changes: 62 additions & 51 deletions lib/amqp/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,30 @@ defmodule Amqpx.Connection do

import Amqpx.Core

alias Amqpx.{Connection, Helper}
alias Amqpx.{Connection, DNS, Helper}

defstruct [:pid]
@type t :: %Connection{pid: pid}

@default_params [
username: "guest",
password: "guest",
virtual_host: "/",
host: ~c"localhost",
port: 5672,
channel_max: 0,
frame_max: 0,
heartbeat: 10,
connection_timeout: 50_000,
ssl_options: :none,
client_properties: [],
socket_options: [],
auth_mechanisms: [
&:amqp_auth_mechanisms.plain/3,
&:amqp_auth_mechanisms.amqplain/3
]
]

@doc """
Opens a new connection without a name.

Expand All @@ -21,7 +40,7 @@ defmodule Amqpx.Connection do
end

@doc """
Opens an new Connection to an Amqpx broker.
Opens a new Connection to an Amqpx broker.

The connections created by this module are supervised under amqp_client's supervision tree.
Please note that connections do not get restarted automatically by the supervision tree in
Expand Down Expand Up @@ -104,9 +123,7 @@ defmodule Amqpx.Connection do
end

def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do
options
|> merge_options_to_default()
|> do_open(name)
do_open(options, @default_params, name)
end

def open(uri, options) when is_binary(uri) and is_list(options) do
Expand All @@ -133,31 +150,34 @@ defmodule Amqpx.Connection do
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri, name, options) when is_binary(uri) and is_list(options) do
case uri |> String.to_charlist() |> :amqp_uri.parse() do
{:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name)
error -> error
{:ok, amqp_params} ->
amqp_params = amqp_params_network(amqp_params)
do_open(options, amqp_params, name)

error ->
error
end
end

@doc false
@spec merge_options_to_amqp_params(tuple, keyword) :: tuple
def merge_options_to_amqp_params(amqp_params, options) do
options = normalize_ssl_options(options)
params = amqp_params_network(amqp_params)
@spec merge_options(keyword, keyword) :: tuple
def merge_options(params, default) do
default = normalize_ssl_options(default)

amqp_params_network(
username: keys_get(options, params, :username),
password: Helper.get_password(options, params),
virtual_host: keys_get(options, params, :virtual_host),
host: options |> keys_get(params, :host) |> to_charlist,
port: keys_get(options, params, :port),
channel_max: keys_get(options, params, :channel_max),
frame_max: keys_get(options, params, :frame_max),
heartbeat: keys_get(options, params, :heartbeat),
connection_timeout: keys_get(options, params, :connection_timeout),
ssl_options: keys_get(options, params, :ssl_options),
client_properties: keys_get(options, params, :client_properties),
socket_options: keys_get(options, params, :socket_options),
auth_mechanisms: keys_get(options, params, :auth_mechanisms)
username: keys_get(params, default, :username),
password: Helper.get_password(params, default),
virtual_host: keys_get(params, default, :virtual_host),
host: params |> keys_get(default, :host) |> to_charlist(),
port: keys_get(params, default, :port),
channel_max: keys_get(params, default, :channel_max),
frame_max: keys_get(params, default, :frame_max),
heartbeat: keys_get(params, default, :heartbeat),
connection_timeout: keys_get(params, default, :connection_timeout),
ssl_options: keys_get(params, default, :ssl_options),
client_properties: keys_get(params, default, :client_properties),
socket_options: keys_get(params, default, :socket_options),
auth_mechanisms: keys_get(params, default, :auth_mechanisms)
)
end

Expand All @@ -166,28 +186,6 @@ defmodule Amqpx.Connection do
Keyword.get(k1, key, Keyword.get(k2, key))
end

defp merge_options_to_default(options) do
amqp_params_network(
username: Keyword.get(options, :username, "guest"),
password: Helper.get_password(options, nil),
virtual_host: Keyword.get(options, :virtual_host, "/"),
host: options |> Keyword.get(:host, 'localhost') |> to_charlist,
port: Keyword.get(options, :port, :undefined),
channel_max: Keyword.get(options, :channel_max, 0),
frame_max: Keyword.get(options, :frame_max, 0),
heartbeat: Keyword.get(options, :heartbeat, 10),
connection_timeout: Keyword.get(options, :connection_timeout, 50_000),
ssl_options: Keyword.get(options, :ssl_options, :none),
client_properties: Keyword.get(options, :client_properties, []),
socket_options: Keyword.get(options, :socket_options, []),
auth_mechanisms:
Keyword.get(options, :auth_mechanisms, [
&:amqp_auth_mechanisms.plain/3,
&:amqp_auth_mechanisms.amqplain/3
])
)
end

@doc """
Closes an open Connection.
"""
Expand All @@ -199,11 +197,24 @@ defmodule Amqpx.Connection do
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
@spec do_open(Keyword.t(), Keyword.t(), String.t()) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
defp do_open(params, default_params, name) do
params
|> keys_get(default_params, :host)
|> to_charlist()
|> DNS.resolve_ips()
|> Enum.reduce_while(nil, fn ip, _ ->
amqp_params = params |> Keyword.put(:host, ip) |> merge_options(default_params)

case :amqp_connection.start(amqp_params, name) do
{:ok, pid} ->
{:halt, {:ok, %Connection{pid: pid}}}

error ->
{:cont, error}
end
end)
end

defp normalize_ssl_options(options) when is_list(options) do
Expand Down
20 changes: 20 additions & 0 deletions lib/amqp/dns.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Amqpx.DNS do
@moduledoc """
Module to resolve a DNS record into an A record.
"""

@doc """
Resolves the IP addresses of a given hostname. If the hostname
cannot be resolved, it returns the hostname itself.
"""
@spec resolve_ips(charlist) :: [charlist]
def resolve_ips(host) do
case :inet.gethostbyname(host) do
{:ok, {:hostent, _, _, _, _, ips}} ->
ips |> Enum.map(&:inet.ntoa/1) |> Enum.dedup()

_ ->
[host]
end
end
end
119 changes: 96 additions & 23 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,29 @@ defmodule ConnectionTest do
use ExUnit.Case

import Amqpx.Core
alias Amqpx.Connection
import Mock

alias Amqpx.{Connection, DNS}

@obfuscate_password false

test "open connection with host as binary" do
assert {:ok, conn} =
Connection.open(
username: "amqpx",
password: "amqpx",
host: "rabbit",
obfuscate_password: @obfuscate_password
)
@invalid_ip '192.168.1.1'
@valid_ip '192.168.1.2'

@open_options [
username: "amqpx",
password: "amqpx",
host: "rabbit",
obfuscate_password: @obfuscate_password
]

test "open connection with host as binary" do
assert {:ok, conn} = Connection.open(@open_options)
assert :ok = Connection.close(conn)
end

test "open connection with host as char list" do
assert {:ok, conn} =
Connection.open(
username: "amqpx",
password: "amqpx",
host: 'rabbit',
obfuscate_password: @obfuscate_password
)

assert {:ok, conn} = Connection.open(@open_options)
assert :ok = Connection.close(conn)
end

Expand All @@ -38,7 +36,7 @@ defmodule ConnectionTest do
test "open connection using both uri and options" do
assert {:ok, conn} =
Connection.open("amqp://amqpx:amqpx@nonexistent:5672",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -48,7 +46,7 @@ defmodule ConnectionTest do
test "open connection with uri, name, and options" do
assert {:ok, conn} =
Connection.open("amqp://amqpx:amqpx@nonexistent:5672", "my-connection",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -59,13 +57,88 @@ defmodule ConnectionTest do
uri = "amqp://guest:amqpx@rabbit:5672"
{:ok, amqp_params} = uri |> String.to_charlist() |> :amqp_uri.parse()

record =
Connection.merge_options_to_amqp_params(amqp_params, username: "amqpx", obfuscate_password: @obfuscate_password)

params = [username: "amqpx", obfuscate_password: @obfuscate_password]
default = amqp_params_network(amqp_params)
record = Connection.merge_options(params, default)
params = amqp_params_network(record)

assert params[:username] == "amqpx"
assert params[:password] == "amqpx"
assert params[:host] == 'rabbit'
assert params[:host] == ~c"rabbit"
end

describe "connecting using dns name resolution" do
test "Connection.open fails if none of the ips are reachable" do
with_mock DNS, resolve_ips: fn _host -> [@invalid_ip] end do
start = fn params, _name ->
params = amqp_params_network(params)

if params[:host] == @valid_ip do
{:ok, :c.pid(0, 250, 0)}
else
{:error, :econnrefused}
end
end

with_mock :amqp_connection, start: start do
assert {:error, :econnrefused} = Connection.open(@open_options)
end
end
end

test "Connection.open retry every ip until one succeed" do
with_mock DNS, resolve_ips: fn _host -> [@invalid_ip, @valid_ip] end do
pid = :c.pid(0, 250, 0)

start = fn params, _name ->
params = amqp_params_network(params)

if params[:host] == @valid_ip do
{:ok, pid}
else
{:error, :econnrefused}
end
end

with_mock :amqp_connection, start: start do
assert {:ok, %Connection{pid: ^pid}} = Connection.open(@open_options)
end
end
end

test "Connection.open retry every ip until one succeed, reversed" do
with_mock DNS, resolve_ips: fn _host -> [@valid_ip, @invalid_ip] end do
pid = :c.pid(0, 250, 0)

start = fn params, _name ->
params = amqp_params_network(params)

if params[:host] == @valid_ip do
{:ok, pid}
else
{:error, :econnrefused}
end
end

with_mock :amqp_connection, start: start do
assert {:ok, %Connection{pid: ^pid}} = Connection.open(@open_options)
end
end
end
end

describe "ip resolution" do
test "localhost is resolved as 127.0.0.1" do
assert [~c"127.0.0.1"] = DNS.resolve_ips(~c"localhost")
end

test "rabbit can be resolved into an ip" do
assert [ip] = DNS.resolve_ips(~c"rabbit")
assert {:ok, _} = :inet.parse_address(ip)
end

test "unknown host will not be resolved" do
assert [~c"nonexistent"] = DNS.resolve_ips(~c"nonexistent")
end
end
end
Loading