Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-2274]: Investigate possible DNS caching issues on our rabbitmq libraries #199

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
erlang 23.1
elixir 1.11.2-otp-23
erlang 25.3.2.8
elixir 1.13.4-otp-25
122 changes: 73 additions & 49 deletions lib/amqp/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@ defmodule Amqpx.Connection do
defstruct [:pid]
@type t :: %Connection{pid: pid}

@default_params [
username: "guest",
password: "guest",
virtual_host: "/",
host: ~c"localhost",
port: 5672,
channel_max: 0,
frame_max: 0,
heartbeat: 10,
connection_timeout: 50_000,
ssl_options: :none,
client_properties: [],
socket_options: [],
auth_mechanisms: [
&:amqp_auth_mechanisms.plain/3,
&:amqp_auth_mechanisms.amqplain/3
]
]

@doc """
Opens a new connection without a name.

Expand Down Expand Up @@ -104,9 +123,7 @@ defmodule Amqpx.Connection do
end

def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do
options
|> merge_options_to_default()
|> do_open(name)
do_open(options, @default_params, name)
end

def open(uri, options) when is_binary(uri) and is_list(options) do
Expand All @@ -133,31 +150,34 @@ defmodule Amqpx.Connection do
{:ok, t()} | {:error, atom()} | {:error, any()}
def open(uri, name, options) when is_binary(uri) and is_list(options) do
case uri |> String.to_charlist() |> :amqp_uri.parse() do
{:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name)
error -> error
{:ok, amqp_params} ->
amqp_params = amqp_params_network(amqp_params)
do_open(options, amqp_params, name)

error ->
error
end
end

@doc false
@spec merge_options_to_amqp_params(tuple, keyword) :: tuple
def merge_options_to_amqp_params(amqp_params, options) do
options = normalize_ssl_options(options)
params = amqp_params_network(amqp_params)
@spec merge_options(keyword, keyword) :: tuple
def merge_options(params, default) do
default = normalize_ssl_options(default)

amqp_params_network(
username: keys_get(options, params, :username),
password: Helper.get_password(options, params),
virtual_host: keys_get(options, params, :virtual_host),
host: options |> keys_get(params, :host) |> to_charlist,
port: keys_get(options, params, :port),
channel_max: keys_get(options, params, :channel_max),
frame_max: keys_get(options, params, :frame_max),
heartbeat: keys_get(options, params, :heartbeat),
connection_timeout: keys_get(options, params, :connection_timeout),
ssl_options: keys_get(options, params, :ssl_options),
client_properties: keys_get(options, params, :client_properties),
socket_options: keys_get(options, params, :socket_options),
auth_mechanisms: keys_get(options, params, :auth_mechanisms)
username: keys_get(params, default, :username),
password: Helper.get_password(params, default),
virtual_host: keys_get(params, default, :virtual_host),
host: params |> keys_get(default, :host) |> to_charlist(),
port: keys_get(params, default, :port),
channel_max: keys_get(params, default, :channel_max),
frame_max: keys_get(params, default, :frame_max),
heartbeat: keys_get(params, default, :heartbeat),
connection_timeout: keys_get(params, default, :connection_timeout),
ssl_options: keys_get(params, default, :ssl_options),
client_properties: keys_get(params, default, :client_properties),
socket_options: keys_get(params, default, :socket_options),
auth_mechanisms: keys_get(params, default, :auth_mechanisms)
)
end

Expand All @@ -166,28 +186,6 @@ defmodule Amqpx.Connection do
Keyword.get(k1, key, Keyword.get(k2, key))
end

defp merge_options_to_default(options) do
amqp_params_network(
username: Keyword.get(options, :username, "guest"),
password: Helper.get_password(options, nil),
virtual_host: Keyword.get(options, :virtual_host, "/"),
host: options |> Keyword.get(:host, 'localhost') |> to_charlist,
port: Keyword.get(options, :port, :undefined),
channel_max: Keyword.get(options, :channel_max, 0),
frame_max: Keyword.get(options, :frame_max, 0),
heartbeat: Keyword.get(options, :heartbeat, 10),
connection_timeout: Keyword.get(options, :connection_timeout, 50_000),
ssl_options: Keyword.get(options, :ssl_options, :none),
client_properties: Keyword.get(options, :client_properties, []),
socket_options: Keyword.get(options, :socket_options, []),
auth_mechanisms:
Keyword.get(options, :auth_mechanisms, [
&:amqp_auth_mechanisms.plain/3,
&:amqp_auth_mechanisms.amqplain/3
])
)
end

@doc """
Closes an open Connection.
"""
Expand All @@ -199,11 +197,24 @@ defmodule Amqpx.Connection do
end
end

defp do_open(amqp_params, name) do
case :amqp_connection.start(amqp_params, name) do
{:ok, pid} -> {:ok, %Connection{pid: pid}}
error -> error
end
@spec do_open(Keyword.t(), Keyword.t(), String.t()) ::
{:ok, t()} | {:error, atom()} | {:error, any()}
defp do_open(params, default_params, name) do
params
|> keys_get(default_params, :host)
|> to_charlist()
|> resolve_ips()
|> Enum.reduce_while(nil, fn ip, _ ->
amqp_params = params |> Keyword.put(:host, ip) |> merge_options(default_params)

case :amqp_connection.start(amqp_params, name) do
{:ok, pid} ->
{:halt, {:ok, %Connection{pid: pid}}}

error ->
{:cont, error}
end
end)
end

defp normalize_ssl_options(options) when is_list(options) do
Expand All @@ -217,4 +228,17 @@ defmodule Amqpx.Connection do
end

defp normalize_ssl_options(options), do: options

# Resolves the IP addresses of a given hostname. If the hostname
# cannot be resolved, it returns the hostname itself.
@spec resolve_ips(charlist) :: [charlist]
def resolve_ips(host) do
case :inet.gethostbyname(host) do
{:ok, {:hostent, _, _, _, _, ips}} ->
ips |> Enum.map(&:inet.ntoa/1) |> Enum.dedup()

_ ->
[host]
end
end
end
29 changes: 22 additions & 7 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule ConnectionTest do
Connection.open(
username: "amqpx",
password: "amqpx",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -38,7 +38,7 @@ defmodule ConnectionTest do
test "open connection using both uri and options" do
assert {:ok, conn} =
Connection.open("amqp://amqpx:amqpx@nonexistent:5672",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -48,7 +48,7 @@ defmodule ConnectionTest do
test "open connection with uri, name, and options" do
assert {:ok, conn} =
Connection.open("amqp://amqpx:amqpx@nonexistent:5672", "my-connection",
host: 'rabbit',
host: ~c"rabbit",
obfuscate_password: @obfuscate_password
)

Expand All @@ -59,13 +59,28 @@ defmodule ConnectionTest do
uri = "amqp://guest:amqpx@rabbit:5672"
{:ok, amqp_params} = uri |> String.to_charlist() |> :amqp_uri.parse()

record =
Connection.merge_options_to_amqp_params(amqp_params, username: "amqpx", obfuscate_password: @obfuscate_password)

params = [username: "amqpx", obfuscate_password: @obfuscate_password]
default = amqp_params_network(amqp_params)
record = Connection.merge_options(params, default)
params = amqp_params_network(record)

assert params[:username] == "amqpx"
assert params[:password] == "amqpx"
assert params[:host] == 'rabbit'
assert params[:host] == ~c"rabbit"
end

describe "ip resolution" do
test "localhost is resolved as 127.0.0.1" do
assert [~c"127.0.0.1"] = Connection.resolve_ips(~c"localhost")
end

test "rabbit can be resolved into an ip" do
assert [ip] = Connection.resolve_ips(~c"rabbit")
assert {:ok, _} = :inet.parse_address(ip)
end

test "unknown host will not be resolved" do
assert [~c"nonexistent"] = Connection.resolve_ips(~c"nonexistent")
end
end
end
Loading