From c0a75e5327b13d97ca92de586b43fba490f139af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Fri, 28 Feb 2025 15:24:29 +0000 Subject: [PATCH 1/3] fix: Add clustered testing; reduce impact of counters in flakiness --- lib/realtime/rate_counter/rate_counter.ex | 2 +- mix.exs | 2 +- test/realtime/database_test.exs | 47 +++++++++++ test/realtime/rpc_test.exs | 96 +++++++++++++---------- test/realtime/tenants/connect_test.exs | 6 +- test/support/clustered.ex | 66 ++++++++++++++++ test/support/containers.ex | 41 ++++++---- test/support/generators.ex | 1 + 8 files changed, 200 insertions(+), 61 deletions(-) create mode 100644 test/support/clustered.ex diff --git a/lib/realtime/rate_counter/rate_counter.ex b/lib/realtime/rate_counter/rate_counter.ex index 0148feeb8..a18529a5f 100644 --- a/lib/realtime/rate_counter/rate_counter.ex +++ b/lib/realtime/rate_counter/rate_counter.ex @@ -74,7 +74,7 @@ defmodule Realtime.RateCounter do ]) Enum.each(keys, fn {{_, _, key}, {pid, _}} -> - GenServer.stop(pid) + if Process.alive?(pid), do: GenServer.stop(pid) Cachex.del!(@cache, key) end) diff --git a/mix.exs b/mix.exs index b546ff5b7..66ca72cf1 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.34.37", + version: "2.34.38", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/database_test.exs b/test/realtime/database_test.exs index 736d42379..ac8e8155d 100644 --- a/test/realtime/database_test.exs +++ b/test/realtime/database_test.exs @@ -5,6 +5,9 @@ defmodule Realtime.DatabaseTest do import ExUnit.CaptureLog alias Realtime.Database + alias Realtime.Rpc + alias Realtime.Tenants.Connect + doctest Realtime.Database def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata}) @@ -202,6 +205,50 @@ defmodule Realtime.DatabaseTest do end end + @aux_mod (quote do + defmodule Aux do + def checker(transaction_conn) do + Postgrex.query!(transaction_conn, "SELECT 1", []) + end + + def error(transaction_conn) do + Postgrex.query!(transaction_conn, "SELECT 1/0", []) + end + + def exception(_) do + raise RuntimeError, "💣" + end + end + end) + + Code.eval_quoted(@aux_mod) + + describe "transaction/1 in clustered mode" do + test "success call returns output" do + tenant = Containers.checkout_tenant() + {:ok, node} = Clustered.start(@aux_mod) + {:ok, db_conn} = Rpc.call(node, Connect, :lookup_or_start_connection, [tenant.external_id]) + assert {:ok, %Postgrex.Result{rows: [[1]]}} = Database.transaction(db_conn, &Aux.checker/1) + on_exit(fn -> Containers.checkin_tenant(tenant) end) + end + + test "handles database errors" do + tenant = Containers.checkout_tenant() + {:ok, node} = Clustered.start(@aux_mod) + {:ok, db_conn} = Rpc.call(node, Connect, :lookup_or_start_connection, [tenant.external_id]) + assert {:error, %Postgrex.Error{}} = Database.transaction(db_conn, &Aux.error/1) + on_exit(fn -> Containers.checkin_tenant(tenant) end) + end + + test "handles exception" do + tenant = Containers.checkout_tenant() + {:ok, node} = Clustered.start(@aux_mod) + {:ok, db_conn} = Rpc.call(node, Connect, :lookup_or_start_connection, [tenant.external_id]) + assert {:error, %RuntimeError{}} = Database.transaction(db_conn, &Aux.exception/1) + on_exit(fn -> Containers.checkin_tenant(tenant) end) + end + end + describe "pool_size_by_application_name/2" do test "returns the number of connections per application name" do assert Database.pool_size_by_application_name("realtime_connect", %{}) == 1 diff --git a/test/realtime/rpc_test.exs b/test/realtime/rpc_test.exs index 613628b8f..0311d531b 100644 --- a/test/realtime/rpc_test.exs +++ b/test/realtime/rpc_test.exs @@ -5,108 +5,118 @@ defmodule Realtime.RpcTest do alias Realtime.Rpc - defmodule TestRpc do - def test_raise, do: raise("test") - def test_timeout, do: Process.sleep(200) - def test_success, do: {:ok, "success"} - end + @aux_mod (quote do + defmodule TestRpc do + def test_raise, do: raise("test") + def test_timeout, do: Process.sleep(200) + def test_success, do: {:ok, "success"} + end + end) + + Code.eval_quoted(@aux_mod) def handle_telemetry(event, measurements, metadata, pid: pid), do: send(pid, {event, measurements, metadata}) setup do + {:ok, node} = Clustered.start(@aux_mod) :telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self()) on_exit(fn -> :telemetry.detach(__MODULE__) end) - :ok + + %{node: node} end describe "call/5" do - test "successful RPC call returns exactly what the original function returns" do - assert {:ok, "success"} = Rpc.call(node(), TestRpc, :test_success, []) + test "successful RPC call returns exactly what the original function returns", %{node: node} do + assert {:ok, "success"} = Rpc.call(node, TestRpc, :test_success, []) + origin_node = node() assert_receive {[:realtime, :rpc], %{latency: _}, %{ - mod: Realtime.RpcTest.TestRpc, + mod: TestRpc, func: :test_success, - origin_node: :nonode@nohost, - target_node: :nonode@nohost + origin_node: ^origin_node, + target_node: ^node }} end - test "raised exceptions are properly caught and logged" do - assert {:badrpc, - {:EXIT, - {%RuntimeError{message: "test"}, - [ - {Realtime.RpcTest.TestRpc, :test_raise, 0, - [file: ~c"test/realtime/rpc_test.exs", line: 9, error_info: %{module: Exception}]} - ]}}} = - Rpc.call(node(), TestRpc, :test_raise, []) + test "raised exceptions are properly caught and logged", %{node: node} do + assert {:badrpc, {:EXIT, {%RuntimeError{message: "test"}, [{TestRpc, :test_raise, 0, _}]}}} = + Rpc.call(node, TestRpc, :test_raise, []) + + origin_node = node() assert_receive {[:realtime, :rpc], %{latency: _}, %{ - mod: Realtime.RpcTest.TestRpc, + mod: TestRpc, func: :test_raise, - origin_node: :nonode@nohost, - target_node: :nonode@nohost + origin_node: ^origin_node, + target_node: ^node }} end - test "timeouts are properly caught and logged" do + test "timeouts are properly caught and logged", %{node: node} do assert {:badrpc, :timeout} = - Rpc.call(node(), TestRpc, :test_timeout, [], timeout: 100) + Rpc.call(node, TestRpc, :test_timeout, [], timeout: 100) + + origin_node = node() assert_receive {[:realtime, :rpc], %{latency: _}, %{ - mod: Realtime.RpcTest.TestRpc, + mod: TestRpc, func: :test_timeout, - origin_node: :nonode@nohost, - target_node: :nonode@nohost + origin_node: ^origin_node, + target_node: ^node }} end end describe "enhanced_call/5" do - test "successful RPC call returns exactly what the original function returns" do - assert {:ok, "success"} = Rpc.enhanced_call(node(), TestRpc, :test_success) + test "successful RPC call returns exactly what the original function returns", %{node: node} do + assert {:ok, "success"} = Rpc.enhanced_call(node, TestRpc, :test_success) + origin_node = node() assert_receive {[:realtime, :rpc], %{latency: _}, %{ - mod: Realtime.RpcTest.TestRpc, + mod: TestRpc, func: :test_success, - origin_node: :nonode@nohost, - target_node: :nonode@nohost, + origin_node: ^origin_node, + target_node: ^node, success: true }} end - test "raised exceptions are properly caught and logged" do + test "raised exceptions are properly caught and logged", %{node: node} do assert capture_log(fn -> assert {:error, :rpc_error, %RuntimeError{message: "test"}} = - Rpc.enhanced_call(node(), TestRpc, :test_raise) + Rpc.enhanced_call(node, TestRpc, :test_raise) end) =~ "ErrorOnRpcCall" + origin_node = node() + assert_receive {[:realtime, :rpc], %{latency: _}, %{ - mod: Realtime.RpcTest.TestRpc, + mod: TestRpc, func: :test_raise, - origin_node: :nonode@nohost, - target_node: :nonode@nohost, + origin_node: ^origin_node, + target_node: ^node, success: false }} end - test "timeouts are properly caught and logged" do + test "timeouts are properly caught and logged", %{node: node} do assert capture_log(fn -> assert {:error, :rpc_error, :timeout} = - Rpc.enhanced_call(node(), TestRpc, :test_timeout, [], timeout: 100) + Rpc.enhanced_call(node, TestRpc, :test_timeout, [], timeout: 100) end) =~ "ErrorOnRpcCall" + origin_node = node() + assert_receive {[:realtime, :rpc], %{latency: 0}, %{ - mod: Realtime.RpcTest.TestRpc, + mod: TestRpc, func: :test_timeout, - origin_node: :nonode@nohost, - target_node: :nonode@nohost + origin_node: ^origin_node, + target_node: ^node }} end end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 4c0f3a6fd..ce46bbc82 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -12,9 +12,11 @@ defmodule Realtime.Tenants.ConnectTest do alias Realtime.UsersCounter setup do - tenant = Containers.checkout_tenant() :ets.delete_all_objects(Connect) - on_exit(fn -> Containers.checkin_tenant(tenant) end) + + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) %{tenant: tenant} end diff --git a/test/support/clustered.ex b/test/support/clustered.ex new file mode 100644 index 000000000..b6361859d --- /dev/null +++ b/test/support/clustered.ex @@ -0,0 +1,66 @@ +defmodule Clustered do + @moduledoc """ + Uses the gist https://gist.github.com/ityonemo/177cbc96f8c8722bfc4d127ff9baec62 to start a node for testing + """ + @doc """ + Starts a node for testing. + + Can receive an auxiliary module to be evaluated in the node so you are able to setup functions within the test context and outside of the normal code context + + e.g. + ``` + @aux_mod (quote do + defmodule Aux do + def checker(res), do: res + end + end) + + Code.eval_quoted(@aux_mod) + test "clustered call" do + tenant = Containers.checkout_tenant() + {:ok, _pid, node} = Clustered.start(@aux_mod) + assert ok = :rpc.call(node, Aux, :checker, [:ok]) + end + ``` + """ + def start(aux_mod \\ nil) do + :net_kernel.start([:"main@127.0.0.1"]) + :erlang.set_cookie(:cookie) + + {:ok, pid, node} = + :peer.start_link(%{ + name: :peer.random_name(), + host: ~c"127.0.0.1", + longnames: true, + connection: :standard_io + }) + + :peer.call(pid, :erlang, :set_cookie, [:cookie]) + + Node.connect(node) + + :ok = :erpc.call(node, :code, :add_paths, [:code.get_path()]) + + for {app_name, _, _} <- Application.loaded_applications(), + {key, val} <- Application.get_all_env(app_name) do + :ok = :erpc.call(node, Application, :put_env, [app_name, key, val]) + end + + # We need to override this value as the current implementation overrides the string with a map leading to errors + :ok = :erpc.call(node, Application, :put_env, [:realtime, :jwt_claim_validators, "{}"]) + + {:ok, _} = :erpc.call(node, Application, :ensure_all_started, [:mix]) + :ok = :erpc.call(node, Mix, :env, [Mix.env()]) + + Enum.map( + [:logger, :runtime_tools, :prom_ex, :mix, :os_mon, :realtime], + fn app -> {:ok, _} = :erpc.call(node, Application, :ensure_all_started, [app]) end + ) + + if aux_mod do + {{:module, _, _, _}, []} = :erpc.call(node, Code, :eval_quoted, [aux_mod]) + end + + {:ok, node} + end +end diff --git a/test/support/containers.ex b/test/support/containers.ex index 5c662e2a2..8c1d3e057 100644 --- a/test/support/containers.ex +++ b/test/support/containers.ex @@ -1,6 +1,9 @@ defmodule Containers do - alias Realtime.Tenants.Connect + alias Realtime.Tenants alias Realtime.Database + alias Realtime.GenCounter + alias Realtime.RateCounter + alias Realtime.Tenants.Connect alias Realtime.Tenants.Migrations import ExUnit.CaptureLog @@ -49,9 +52,31 @@ defmodule Containers do def checkout_tenant(run_migrations? \\ false) do tenants = :ets.select(:containers, [{{:_, %{using?: :"$1", tenant: :"$2"}}, [{:==, :"$1", false}], [:"$2"]}]) tenant = Enum.random(tenants) - :ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: true}}) + settings = Database.from_tenant(tenant, "realtime_test", :stop) + + settings = %{settings | max_restarts: 0, ssl: false} + {:ok, conn} = Database.connect_db(settings) + + Postgrex.transaction(conn, fn db_conn -> + pid = Connect.whereis(tenant.external_id) + if pid && Process.alive?(pid), do: Connect.shutdown(tenant.external_id) + + tenant + |> Tenants.limiter_keys() + |> Enum.each(fn key -> + RateCounter.stop(tenant.external_id) + GenCounter.stop(tenant.external_id) + RateCounter.new(key) + GenCounter.new(key) + end) + + Postgrex.query!(db_conn, "DROP SCHEMA realtime CASCADE", []) + Postgrex.query!(db_conn, "CREATE SCHEMA realtime", []) + :ok + end) + if run_migrations? do Migrations.run_migrations(tenant) {:ok, pid} = Database.connect(tenant, "realtime_test", :stop) @@ -62,18 +87,6 @@ defmodule Containers do end def checkin_tenant(tenant) do - settings = Database.from_tenant(tenant, "realtime_test", :stop) - - settings = %{settings | max_restarts: 0, ssl: false} - {:ok, conn} = Database.connect_db(settings) - - Database.transaction(conn, fn db_conn -> - pid = Connect.whereis(tenant.external_id) - if pid && Process.alive?(pid), do: Connect.shutdown(tenant.external_id) - Postgrex.query(db_conn, "DROP SCHEMA realtime CASCADE", []) - Postgrex.query(db_conn, "CREATE SCHEMA realtime", []) - end) - :ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: false}}) end diff --git a/test/support/generators.ex b/test/support/generators.ex index b670445ec..26713f493 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -45,6 +45,7 @@ defmodule Generators do tenant end + @spec message_fixture(Realtime.Api.Tenant.t()) :: any() def message_fixture(tenant, override \\ %{}) do {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) Realtime.Tenants.Migrations.create_partitions(db_conn) From 076716bea44bf7d2df544a6a0b9ccffb3021c040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Fri, 28 Feb 2025 16:13:20 +0000 Subject: [PATCH 2/3] remove unnecessary test. this type of use case would kill the connect and then a newer connection would reset the connection to the database. As such the test condition isn't applicable to our use case --- lib/realtime/tenants/listen.ex | 5 +---- test/realtime/tenants/connect_test.exs | 24 ------------------------ test/support/containers.ex | 2 ++ 3 files changed, 3 insertions(+), 28 deletions(-) diff --git a/lib/realtime/tenants/listen.ex b/lib/realtime/tenants/listen.ex index bcc0499dd..6468d7cc2 100644 --- a/lib/realtime/tenants/listen.ex +++ b/lib/realtime/tenants/listen.ex @@ -87,10 +87,7 @@ defmodule Realtime.Tenants.Listen do """ @spec whereis(String.t()) :: pid() | nil def whereis(tenant_id) do - case Registry.lookup( - Realtime.Registry.Unique, - {Postgrex.Notifications, :tenant_id, tenant_id} - ) do + case Registry.lookup(Realtime.Registry.Unique, {Postgrex.Notifications, :tenant_id, tenant_id}) do [{pid, _}] -> pid [] -> nil end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index ce46bbc82..cc98b43b2 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -280,30 +280,6 @@ defmodule Realtime.Tenants.ConnectTest do assert Process.alive?(listen_pid) end - test "on database disconnect, connection is killed to all components", %{tenant: tenant} do - assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - old_pid = Connect.whereis(tenant.external_id) - Process.sleep(1000) - - old_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id) - old_listen_connection_pid = Listen.whereis(tenant.external_id) - - assert Process.alive?(old_replication_connection_pid) - assert Process.alive?(old_listen_connection_pid) - - System.cmd("docker", ["stop", "realtime-test-#{tenant.external_id}"]) - Process.sleep(500) - System.cmd("docker", ["start", "realtime-test-#{tenant.external_id}"]) - - Process.sleep(3000) - refute Process.alive?(old_pid) - refute Process.alive?(old_replication_connection_pid) - refute Process.alive?(old_listen_connection_pid) - - assert ReplicationConnection.whereis(tenant.external_id) == nil - assert Listen.whereis(tenant.external_id) == nil - end - test "handles max_wal_senders by logging the correct operational code" do tenant = tenant_fixture() tenant = Containers.initialize(tenant, true, true) diff --git a/test/support/containers.ex b/test/support/containers.ex index 8c1d3e057..b4356b232 100644 --- a/test/support/containers.ex +++ b/test/support/containers.ex @@ -44,6 +44,7 @@ defmodule Containers do Migrations.run_migrations(tenant) {:ok, pid} = Database.connect(tenant, "realtime_test", :stop) Migrations.create_partitions(pid) + Process.exit(pid, :normal) end tenant @@ -83,6 +84,7 @@ defmodule Containers do Migrations.create_partitions(pid) end + Process.exit(conn, :normal) tenant end From 257b8cd369942780078819bab9b0e34551431a0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Fri, 28 Feb 2025 16:28:53 +0000 Subject: [PATCH 3/3] fix healthcheck by using node() on assert --- test/realtime_web/controllers/tenant_controller_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/realtime_web/controllers/tenant_controller_test.exs b/test/realtime_web/controllers/tenant_controller_test.exs index cd409b743..16bfb87ea 100644 --- a/test/realtime_web/controllers/tenant_controller_test.exs +++ b/test/realtime_web/controllers/tenant_controller_test.exs @@ -270,7 +270,7 @@ defmodule RealtimeWeb.TenantControllerTest do "db_connected" => false, "connected_cluster" => 0, "region" => "us-east-1", - "node" => "nonode@nohost" + "node" => "#{node()}" } == data end end @@ -292,7 +292,7 @@ defmodule RealtimeWeb.TenantControllerTest do "db_connected" => false, "connected_cluster" => 1, "region" => "us-east-1", - "node" => "nonode@nohost" + "node" => "#{node()}" } == data end end @@ -321,7 +321,7 @@ defmodule RealtimeWeb.TenantControllerTest do "db_connected" => true, "connected_cluster" => 1, "region" => "us-east-1", - "node" => "nonode@nohost" + "node" => "#{node()}" } == data end end