Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add clustered testing; reduce impact of counters in flakiness #1316

Merged
merged 3 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/realtime/rate_counter/rate_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ defmodule Realtime.RateCounter do
])

Enum.each(keys, fn {{_, _, key}, {pid, _}} ->
GenServer.stop(pid)
if Process.alive?(pid), do: GenServer.stop(pid)
Cachex.del!(@cache, key)
end)

Expand Down
5 changes: 1 addition & 4 deletions lib/realtime/tenants/listen.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ defmodule Realtime.Tenants.Listen do
"""
@spec whereis(String.t()) :: pid() | nil
def whereis(tenant_id) do
case Registry.lookup(
Realtime.Registry.Unique,
{Postgrex.Notifications, :tenant_id, tenant_id}
) do
case Registry.lookup(Realtime.Registry.Unique, {Postgrex.Notifications, :tenant_id, tenant_id}) do
[{pid, _}] -> pid
[] -> nil
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.37",
version: "2.34.38",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
47 changes: 47 additions & 0 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ defmodule Realtime.DatabaseTest do
import ExUnit.CaptureLog

alias Realtime.Database
alias Realtime.Rpc
alias Realtime.Tenants.Connect

doctest Realtime.Database
def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata})

Expand Down Expand Up @@ -202,6 +205,50 @@ defmodule Realtime.DatabaseTest do
end
end

@aux_mod (quote do
defmodule Aux do
def checker(transaction_conn) do
Postgrex.query!(transaction_conn, "SELECT 1", [])
end

def error(transaction_conn) do
Postgrex.query!(transaction_conn, "SELECT 1/0", [])
end

def exception(_) do
raise RuntimeError, "💣"
end
end
end)

Code.eval_quoted(@aux_mod)

describe "transaction/1 in clustered mode" do
test "success call returns output" do
tenant = Containers.checkout_tenant()
{:ok, node} = Clustered.start(@aux_mod)
{:ok, db_conn} = Rpc.call(node, Connect, :lookup_or_start_connection, [tenant.external_id])
assert {:ok, %Postgrex.Result{rows: [[1]]}} = Database.transaction(db_conn, &Aux.checker/1)
on_exit(fn -> Containers.checkin_tenant(tenant) end)
end

test "handles database errors" do
tenant = Containers.checkout_tenant()
{:ok, node} = Clustered.start(@aux_mod)
{:ok, db_conn} = Rpc.call(node, Connect, :lookup_or_start_connection, [tenant.external_id])
assert {:error, %Postgrex.Error{}} = Database.transaction(db_conn, &Aux.error/1)
on_exit(fn -> Containers.checkin_tenant(tenant) end)
end

test "handles exception" do
tenant = Containers.checkout_tenant()
{:ok, node} = Clustered.start(@aux_mod)
{:ok, db_conn} = Rpc.call(node, Connect, :lookup_or_start_connection, [tenant.external_id])
assert {:error, %RuntimeError{}} = Database.transaction(db_conn, &Aux.exception/1)
on_exit(fn -> Containers.checkin_tenant(tenant) end)
end
end

describe "pool_size_by_application_name/2" do
test "returns the number of connections per application name" do
assert Database.pool_size_by_application_name("realtime_connect", %{}) == 1
Expand Down
96 changes: 53 additions & 43 deletions test/realtime/rpc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,108 +5,118 @@ defmodule Realtime.RpcTest do

alias Realtime.Rpc

defmodule TestRpc do
def test_raise, do: raise("test")
def test_timeout, do: Process.sleep(200)
def test_success, do: {:ok, "success"}
end
@aux_mod (quote do
defmodule TestRpc do
def test_raise, do: raise("test")
def test_timeout, do: Process.sleep(200)
def test_success, do: {:ok, "success"}
end
end)

Code.eval_quoted(@aux_mod)

def handle_telemetry(event, measurements, metadata, pid: pid), do: send(pid, {event, measurements, metadata})

setup do
{:ok, node} = Clustered.start(@aux_mod)
:telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)
:ok

%{node: node}
end

describe "call/5" do
test "successful RPC call returns exactly what the original function returns" do
assert {:ok, "success"} = Rpc.call(node(), TestRpc, :test_success, [])
test "successful RPC call returns exactly what the original function returns", %{node: node} do
assert {:ok, "success"} = Rpc.call(node, TestRpc, :test_success, [])
origin_node = node()

assert_receive {[:realtime, :rpc], %{latency: _},
%{
mod: Realtime.RpcTest.TestRpc,
mod: TestRpc,
func: :test_success,
origin_node: :nonode@nohost,
target_node: :nonode@nohost
origin_node: ^origin_node,
target_node: ^node
}}
end

test "raised exceptions are properly caught and logged" do
assert {:badrpc,
{:EXIT,
{%RuntimeError{message: "test"},
[
{Realtime.RpcTest.TestRpc, :test_raise, 0,
[file: ~c"test/realtime/rpc_test.exs", line: 9, error_info: %{module: Exception}]}
]}}} =
Rpc.call(node(), TestRpc, :test_raise, [])
test "raised exceptions are properly caught and logged", %{node: node} do
assert {:badrpc, {:EXIT, {%RuntimeError{message: "test"}, [{TestRpc, :test_raise, 0, _}]}}} =
Rpc.call(node, TestRpc, :test_raise, [])

origin_node = node()

assert_receive {[:realtime, :rpc], %{latency: _},
%{
mod: Realtime.RpcTest.TestRpc,
mod: TestRpc,
func: :test_raise,
origin_node: :nonode@nohost,
target_node: :nonode@nohost
origin_node: ^origin_node,
target_node: ^node
}}
end

test "timeouts are properly caught and logged" do
test "timeouts are properly caught and logged", %{node: node} do
assert {:badrpc, :timeout} =
Rpc.call(node(), TestRpc, :test_timeout, [], timeout: 100)
Rpc.call(node, TestRpc, :test_timeout, [], timeout: 100)

origin_node = node()

assert_receive {[:realtime, :rpc], %{latency: _},
%{
mod: Realtime.RpcTest.TestRpc,
mod: TestRpc,
func: :test_timeout,
origin_node: :nonode@nohost,
target_node: :nonode@nohost
origin_node: ^origin_node,
target_node: ^node
}}
end
end

describe "enhanced_call/5" do
test "successful RPC call returns exactly what the original function returns" do
assert {:ok, "success"} = Rpc.enhanced_call(node(), TestRpc, :test_success)
test "successful RPC call returns exactly what the original function returns", %{node: node} do
assert {:ok, "success"} = Rpc.enhanced_call(node, TestRpc, :test_success)
origin_node = node()

assert_receive {[:realtime, :rpc], %{latency: _},
%{
mod: Realtime.RpcTest.TestRpc,
mod: TestRpc,
func: :test_success,
origin_node: :nonode@nohost,
target_node: :nonode@nohost,
origin_node: ^origin_node,
target_node: ^node,
success: true
}}
end

test "raised exceptions are properly caught and logged" do
test "raised exceptions are properly caught and logged", %{node: node} do
assert capture_log(fn ->
assert {:error, :rpc_error, %RuntimeError{message: "test"}} =
Rpc.enhanced_call(node(), TestRpc, :test_raise)
Rpc.enhanced_call(node, TestRpc, :test_raise)
end) =~ "ErrorOnRpcCall"

origin_node = node()

assert_receive {[:realtime, :rpc], %{latency: _},
%{
mod: Realtime.RpcTest.TestRpc,
mod: TestRpc,
func: :test_raise,
origin_node: :nonode@nohost,
target_node: :nonode@nohost,
origin_node: ^origin_node,
target_node: ^node,
success: false
}}
end

test "timeouts are properly caught and logged" do
test "timeouts are properly caught and logged", %{node: node} do
assert capture_log(fn ->
assert {:error, :rpc_error, :timeout} =
Rpc.enhanced_call(node(), TestRpc, :test_timeout, [], timeout: 100)
Rpc.enhanced_call(node, TestRpc, :test_timeout, [], timeout: 100)
end) =~ "ErrorOnRpcCall"

origin_node = node()

assert_receive {[:realtime, :rpc], %{latency: 0},
%{
mod: Realtime.RpcTest.TestRpc,
mod: TestRpc,
func: :test_timeout,
origin_node: :nonode@nohost,
target_node: :nonode@nohost
origin_node: ^origin_node,
target_node: ^node
}}
end
end
Expand Down
30 changes: 4 additions & 26 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ defmodule Realtime.Tenants.ConnectTest do
alias Realtime.UsersCounter

setup do
tenant = Containers.checkout_tenant()
:ets.delete_all_objects(Connect)
on_exit(fn -> Containers.checkin_tenant(tenant) end)

tenant = tenant_fixture()
tenant = Containers.initialize(tenant, true, true)
on_exit(fn -> Containers.stop_container(tenant) end)

%{tenant: tenant}
end
Expand Down Expand Up @@ -278,30 +280,6 @@ defmodule Realtime.Tenants.ConnectTest do
assert Process.alive?(listen_pid)
end

test "on database disconnect, connection is killed to all components", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
old_pid = Connect.whereis(tenant.external_id)
Process.sleep(1000)

old_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
old_listen_connection_pid = Listen.whereis(tenant.external_id)

assert Process.alive?(old_replication_connection_pid)
assert Process.alive?(old_listen_connection_pid)

System.cmd("docker", ["stop", "realtime-test-#{tenant.external_id}"])
Process.sleep(500)
System.cmd("docker", ["start", "realtime-test-#{tenant.external_id}"])

Process.sleep(3000)
refute Process.alive?(old_pid)
refute Process.alive?(old_replication_connection_pid)
refute Process.alive?(old_listen_connection_pid)

assert ReplicationConnection.whereis(tenant.external_id) == nil
assert Listen.whereis(tenant.external_id) == nil
end

test "handles max_wal_senders by logging the correct operational code" do
tenant = tenant_fixture()
tenant = Containers.initialize(tenant, true, true)
Expand Down
6 changes: 3 additions & 3 deletions test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ defmodule RealtimeWeb.TenantControllerTest do
"db_connected" => false,
"connected_cluster" => 0,
"region" => "us-east-1",
"node" => "nonode@nohost"
"node" => "#{node()}"
} == data
end
end
Expand All @@ -292,7 +292,7 @@ defmodule RealtimeWeb.TenantControllerTest do
"db_connected" => false,
"connected_cluster" => 1,
"region" => "us-east-1",
"node" => "nonode@nohost"
"node" => "#{node()}"
} == data
end
end
Expand Down Expand Up @@ -321,7 +321,7 @@ defmodule RealtimeWeb.TenantControllerTest do
"db_connected" => true,
"connected_cluster" => 1,
"region" => "us-east-1",
"node" => "nonode@nohost"
"node" => "#{node()}"
} == data
end
end
Expand Down
66 changes: 66 additions & 0 deletions test/support/clustered.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule Clustered do
@moduledoc """
Uses the gist https://gist.github.com/ityonemo/177cbc96f8c8722bfc4d127ff9baec62 to start a node for testing
"""
@doc """
Starts a node for testing.

Can receive an auxiliary module to be evaluated in the node so you are able to setup functions within the test context and outside of the normal code context

e.g.
```
@aux_mod (quote do
defmodule Aux do
def checker(res), do: res
end
end)

Code.eval_quoted(@aux_mod)
test "clustered call" do
tenant = Containers.checkout_tenant()
{:ok, _pid, node} = Clustered.start(@aux_mod)
assert ok = :rpc.call(node, Aux, :checker, [:ok])
end
```
"""
def start(aux_mod \\ nil) do
:net_kernel.start([:"main@127.0.0.1"])
:erlang.set_cookie(:cookie)

{:ok, pid, node} =
:peer.start_link(%{
name: :peer.random_name(),
host: ~c"127.0.0.1",
longnames: true,
connection: :standard_io
})

:peer.call(pid, :erlang, :set_cookie, [:cookie])

Node.connect(node)

:ok = :erpc.call(node, :code, :add_paths, [:code.get_path()])

for {app_name, _, _} <- Application.loaded_applications(),
{key, val} <- Application.get_all_env(app_name) do
:ok = :erpc.call(node, Application, :put_env, [app_name, key, val])
end

# We need to override this value as the current implementation overrides the string with a map leading to errors
:ok = :erpc.call(node, Application, :put_env, [:realtime, :jwt_claim_validators, "{}"])

{:ok, _} = :erpc.call(node, Application, :ensure_all_started, [:mix])
:ok = :erpc.call(node, Mix, :env, [Mix.env()])

Enum.map(
[:logger, :runtime_tools, :prom_ex, :mix, :os_mon, :realtime],
fn app -> {:ok, _} = :erpc.call(node, Application, :ensure_all_started, [app]) end
)

if aux_mod do
{{:module, _, _, _}, []} = :erpc.call(node, Code, :eval_quoted, [aux_mod])
end

{:ok, node}
end
end
Loading
Loading