diff --git a/.tool-versions b/.tool-versions index 90bd0be..65ec180 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -erlang 23.1 -elixir 1.11.2-otp-23 +erlang 25.3.2.8 +elixir 1.13.4-otp-25 diff --git a/lib/amqp/connection.ex b/lib/amqp/connection.ex index 8d03877..b747da8 100644 --- a/lib/amqp/connection.ex +++ b/lib/amqp/connection.ex @@ -5,11 +5,30 @@ defmodule Amqpx.Connection do import Amqpx.Core - alias Amqpx.{Connection, Helper} + alias Amqpx.{Connection, DNS, Helper} defstruct [:pid] @type t :: %Connection{pid: pid} + @default_params [ + username: "guest", + password: "guest", + virtual_host: "/", + host: ~c"localhost", + port: 5672, + channel_max: 0, + frame_max: 0, + heartbeat: 10, + connection_timeout: 50_000, + ssl_options: :none, + client_properties: [], + socket_options: [], + auth_mechanisms: [ + &:amqp_auth_mechanisms.plain/3, + &:amqp_auth_mechanisms.amqplain/3 + ] + ] + @doc """ Opens a new connection without a name. @@ -21,7 +40,7 @@ defmodule Amqpx.Connection do end @doc """ - Opens an new Connection to an Amqpx broker. + Opens a new Connection to an Amqpx broker. The connections created by this module are supervised under amqp_client's supervision tree. Please note that connections do not get restarted automatically by the supervision tree in @@ -104,9 +123,7 @@ defmodule Amqpx.Connection do end def open(options, name) when is_list(options) and (is_binary(name) or name == :undefined) do - options - |> merge_options_to_default() - |> do_open(name) + do_open(options, @default_params, name) end def open(uri, options) when is_binary(uri) and is_list(options) do @@ -133,31 +150,34 @@ defmodule Amqpx.Connection do {:ok, t()} | {:error, atom()} | {:error, any()} def open(uri, name, options) when is_binary(uri) and is_list(options) do case uri |> String.to_charlist() |> :amqp_uri.parse() do - {:ok, amqp_params} -> amqp_params |> merge_options_to_amqp_params(options) |> do_open(name) - error -> error + {:ok, amqp_params} -> + amqp_params = amqp_params_network(amqp_params) + do_open(options, amqp_params, name) + + error -> + error end end @doc false - @spec merge_options_to_amqp_params(tuple, keyword) :: tuple - def merge_options_to_amqp_params(amqp_params, options) do - options = normalize_ssl_options(options) - params = amqp_params_network(amqp_params) + @spec merge_options(keyword, keyword) :: tuple + def merge_options(params, default) do + default = normalize_ssl_options(default) amqp_params_network( - username: keys_get(options, params, :username), - password: Helper.get_password(options, params), - virtual_host: keys_get(options, params, :virtual_host), - host: options |> keys_get(params, :host) |> to_charlist, - port: keys_get(options, params, :port), - channel_max: keys_get(options, params, :channel_max), - frame_max: keys_get(options, params, :frame_max), - heartbeat: keys_get(options, params, :heartbeat), - connection_timeout: keys_get(options, params, :connection_timeout), - ssl_options: keys_get(options, params, :ssl_options), - client_properties: keys_get(options, params, :client_properties), - socket_options: keys_get(options, params, :socket_options), - auth_mechanisms: keys_get(options, params, :auth_mechanisms) + username: keys_get(params, default, :username), + password: Helper.get_password(params, default), + virtual_host: keys_get(params, default, :virtual_host), + host: params |> keys_get(default, :host) |> to_charlist(), + port: keys_get(params, default, :port), + channel_max: keys_get(params, default, :channel_max), + frame_max: keys_get(params, default, :frame_max), + heartbeat: keys_get(params, default, :heartbeat), + connection_timeout: keys_get(params, default, :connection_timeout), + ssl_options: keys_get(params, default, :ssl_options), + client_properties: keys_get(params, default, :client_properties), + socket_options: keys_get(params, default, :socket_options), + auth_mechanisms: keys_get(params, default, :auth_mechanisms) ) end @@ -166,28 +186,6 @@ defmodule Amqpx.Connection do Keyword.get(k1, key, Keyword.get(k2, key)) end - defp merge_options_to_default(options) do - amqp_params_network( - username: Keyword.get(options, :username, "guest"), - password: Helper.get_password(options, nil), - virtual_host: Keyword.get(options, :virtual_host, "/"), - host: options |> Keyword.get(:host, 'localhost') |> to_charlist, - port: Keyword.get(options, :port, :undefined), - channel_max: Keyword.get(options, :channel_max, 0), - frame_max: Keyword.get(options, :frame_max, 0), - heartbeat: Keyword.get(options, :heartbeat, 10), - connection_timeout: Keyword.get(options, :connection_timeout, 50_000), - ssl_options: Keyword.get(options, :ssl_options, :none), - client_properties: Keyword.get(options, :client_properties, []), - socket_options: Keyword.get(options, :socket_options, []), - auth_mechanisms: - Keyword.get(options, :auth_mechanisms, [ - &:amqp_auth_mechanisms.plain/3, - &:amqp_auth_mechanisms.amqplain/3 - ]) - ) - end - @doc """ Closes an open Connection. """ @@ -199,11 +197,24 @@ defmodule Amqpx.Connection do end end - defp do_open(amqp_params, name) do - case :amqp_connection.start(amqp_params, name) do - {:ok, pid} -> {:ok, %Connection{pid: pid}} - error -> error - end + @spec do_open(Keyword.t(), Keyword.t(), String.t()) :: + {:ok, t()} | {:error, atom()} | {:error, any()} + defp do_open(params, default_params, name) do + params + |> keys_get(default_params, :host) + |> to_charlist() + |> DNS.resolve_ips() + |> Enum.reduce_while(nil, fn ip, _ -> + amqp_params = params |> Keyword.put(:host, ip) |> merge_options(default_params) + + case :amqp_connection.start(amqp_params, name) do + {:ok, pid} -> + {:halt, {:ok, %Connection{pid: pid}}} + + error -> + {:cont, error} + end + end) end defp normalize_ssl_options(options) when is_list(options) do diff --git a/lib/amqp/dns.ex b/lib/amqp/dns.ex new file mode 100644 index 0000000..e3c3192 --- /dev/null +++ b/lib/amqp/dns.ex @@ -0,0 +1,20 @@ +defmodule Amqpx.DNS do + @moduledoc """ + Module to resolve a DNS record into an A record. + """ + + @doc """ + Resolves the IP addresses of a given hostname. If the hostname + cannot be resolved, it returns the hostname itself. + """ + @spec resolve_ips(charlist) :: [charlist] + def resolve_ips(host) do + case :inet.gethostbyname(host) do + {:ok, {:hostent, _, _, _, _, ips}} -> + ips |> Enum.map(&:inet.ntoa/1) |> Enum.dedup() + + _ -> + [host] + end + end +end diff --git a/test/connection_test.exs b/test/connection_test.exs index 41490d7..d251a47 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -2,31 +2,29 @@ defmodule ConnectionTest do use ExUnit.Case import Amqpx.Core - alias Amqpx.Connection + import Mock + + alias Amqpx.{Connection, DNS} @obfuscate_password false - test "open connection with host as binary" do - assert {:ok, conn} = - Connection.open( - username: "amqpx", - password: "amqpx", - host: "rabbit", - obfuscate_password: @obfuscate_password - ) + @invalid_ip '192.168.1.1' + @valid_ip '192.168.1.2' + + @open_options [ + username: "amqpx", + password: "amqpx", + host: "rabbit", + obfuscate_password: @obfuscate_password + ] + test "open connection with host as binary" do + assert {:ok, conn} = Connection.open(@open_options) assert :ok = Connection.close(conn) end test "open connection with host as char list" do - assert {:ok, conn} = - Connection.open( - username: "amqpx", - password: "amqpx", - host: 'rabbit', - obfuscate_password: @obfuscate_password - ) - + assert {:ok, conn} = Connection.open(@open_options) assert :ok = Connection.close(conn) end @@ -38,7 +36,7 @@ defmodule ConnectionTest do test "open connection using both uri and options" do assert {:ok, conn} = Connection.open("amqp://amqpx:amqpx@nonexistent:5672", - host: 'rabbit', + host: ~c"rabbit", obfuscate_password: @obfuscate_password ) @@ -48,7 +46,7 @@ defmodule ConnectionTest do test "open connection with uri, name, and options" do assert {:ok, conn} = Connection.open("amqp://amqpx:amqpx@nonexistent:5672", "my-connection", - host: 'rabbit', + host: ~c"rabbit", obfuscate_password: @obfuscate_password ) @@ -59,13 +57,88 @@ defmodule ConnectionTest do uri = "amqp://guest:amqpx@rabbit:5672" {:ok, amqp_params} = uri |> String.to_charlist() |> :amqp_uri.parse() - record = - Connection.merge_options_to_amqp_params(amqp_params, username: "amqpx", obfuscate_password: @obfuscate_password) - + params = [username: "amqpx", obfuscate_password: @obfuscate_password] + default = amqp_params_network(amqp_params) + record = Connection.merge_options(params, default) params = amqp_params_network(record) assert params[:username] == "amqpx" assert params[:password] == "amqpx" - assert params[:host] == 'rabbit' + assert params[:host] == ~c"rabbit" + end + + describe "connecting using dns name resolution" do + test "Connection.open fails if none of the ips are reachable" do + with_mock DNS, resolve_ips: fn _host -> [@invalid_ip] end do + start = fn params, _name -> + params = amqp_params_network(params) + + if params[:host] == @valid_ip do + {:ok, :c.pid(0, 250, 0)} + else + {:error, :econnrefused} + end + end + + with_mock :amqp_connection, start: start do + assert {:error, :econnrefused} = Connection.open(@open_options) + end + end + end + + test "Connection.open retry every ip until one succeed" do + with_mock DNS, resolve_ips: fn _host -> [@invalid_ip, @valid_ip] end do + pid = :c.pid(0, 250, 0) + + start = fn params, _name -> + params = amqp_params_network(params) + + if params[:host] == @valid_ip do + {:ok, pid} + else + {:error, :econnrefused} + end + end + + with_mock :amqp_connection, start: start do + assert {:ok, %Connection{pid: ^pid}} = Connection.open(@open_options) + end + end + end + + test "Connection.open retry every ip until one succeed, reversed" do + with_mock DNS, resolve_ips: fn _host -> [@valid_ip, @invalid_ip] end do + pid = :c.pid(0, 250, 0) + + start = fn params, _name -> + params = amqp_params_network(params) + + if params[:host] == @valid_ip do + {:ok, pid} + else + {:error, :econnrefused} + end + end + + with_mock :amqp_connection, start: start do + assert {:ok, %Connection{pid: ^pid}} = Connection.open(@open_options) + end + end + end + end + + describe "ip resolution" do + test "localhost is resolved as 127.0.0.1" do + assert [~c"127.0.0.1"] = DNS.resolve_ips(~c"localhost") + end + + test "rabbit can be resolved into an ip" do + assert [ip] = DNS.resolve_ips(~c"rabbit") + assert {:ok, _} = :inet.parse_address(ip) + end + + test "unknown host will not be resolved" do + assert [~c"nonexistent"] = DNS.resolve_ips(~c"nonexistent") + end end end