From 6aff7dc915d028349e28ae0d434e74d1622fa3b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Thu, 20 Feb 2025 23:30:17 +0000 Subject: [PATCH] fix: Improve failure telemetry --- lib/realtime/monitoring/prom_ex.ex | 8 ++- .../monitoring/prom_ex/plugins/channels.ex | 20 +++++++ .../monitoring/prom_ex/plugins/tenants.ex | 10 +--- lib/realtime/rpc.ex | 50 ++++++++++------- .../channels/realtime_channel/logging.ex | 14 +++++ mix.exs | 2 +- test/realtime/database_test.exs | 19 ++++--- test/realtime/monitoring/prom_ex_test.exs | 19 +++++++ test/realtime/rpc_test.exs | 53 +++++++++++++++++-- .../realtime_channel/logging_test.exs | 19 ++++++- 10 files changed, 168 insertions(+), 46 deletions(-) create mode 100644 lib/realtime/monitoring/prom_ex/plugins/channels.ex create mode 100644 test/realtime/monitoring/prom_ex_test.exs diff --git a/lib/realtime/monitoring/prom_ex.ex b/lib/realtime/monitoring/prom_ex.ex index 571575a92..3b66b6f94 100644 --- a/lib/realtime/monitoring/prom_ex.ex +++ b/lib/realtime/monitoring/prom_ex.ex @@ -1,7 +1,10 @@ defmodule Realtime.PromEx do - alias Realtime.PromEx.Plugins.{OsMon, Phoenix, Tenants, Tenant} - alias Realtime.Nodes + alias Realtime.PromEx.Plugins.Channels + alias Realtime.PromEx.Plugins.OsMon + alias Realtime.PromEx.Plugins.Phoenix + alias Realtime.PromEx.Plugins.Tenant + alias Realtime.PromEx.Plugins.Tenants @moduledoc """ Be sure to add the following to finish setting up PromEx: @@ -70,6 +73,7 @@ defmodule Realtime.PromEx do {OsMon, poll_rate: poll_rate}, {Tenants, poll_rate: poll_rate}, {Tenant, poll_rate: poll_rate}, + {Channels, poll_rate: poll_rate}, {PromEx.Plugins.Ecto, otp_app: :realtime, poll_rate: poll_rate, metric_prefix: [:ecto]} ] end diff --git a/lib/realtime/monitoring/prom_ex/plugins/channels.ex b/lib/realtime/monitoring/prom_ex/plugins/channels.ex new file mode 100644 index 000000000..357838f21 --- /dev/null +++ b/lib/realtime/monitoring/prom_ex/plugins/channels.ex @@ -0,0 +1,20 @@ +defmodule Realtime.PromEx.Plugins.Channels do + @moduledoc """ + Realtime channels monitoring plugin for PromEx + """ + use PromEx.Plugin + require Logger + + @impl true + def event_metrics(_opts) do + Event.build(:realtime, [ + counter( + [:realtime, :channel, :error], + event_name: [:realtime, :channel, :error], + measurement: :code, + tags: [:code], + description: "Count of errors in the Realtime channels initialization" + ) + ]) + end +end diff --git a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex index bcb6dd012..ddb91d5d1 100644 --- a/lib/realtime/monitoring/prom_ex/plugins/tenants.ex +++ b/lib/realtime/monitoring/prom_ex/plugins/tenants.ex @@ -10,11 +10,7 @@ defmodule Realtime.PromEx.Plugins.Tenants do @event_connected [:prom_ex, :plugin, :realtime, :tenants, :connected] @impl true - def event_metrics(opts) do - rpc_metrics(opts) - end - - defp rpc_metrics(_opts) do + def event_metrics(_) do Event.build(:realtime, [ distribution( [:realtime, :rpc], @@ -58,9 +54,7 @@ defmodule Realtime.PromEx.Plugins.Tenants do -1 end - execute_metrics(@event_connected, %{ - connected: connected - }) + execute_metrics(@event_connected, %{connected: connected}) end defp execute_metrics(event, metrics) do diff --git a/lib/realtime/rpc.ex b/lib/realtime/rpc.ex index 92a5f5ddc..8d2f6f720 100644 --- a/lib/realtime/rpc.ex +++ b/lib/realtime/rpc.ex @@ -12,18 +12,11 @@ defmodule Realtime.Rpc do def call(node, mod, func, args, opts \\ []) do timeout = Keyword.get(opts, :timeout, Application.get_env(:realtime, :rpc_timeout)) {latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, args, timeout) end) - tenant = Keyword.get(opts, :tenant, nil) Telemetry.execute( - [:realtime, :tenants, :rpc], + [:realtime, :rpc], %{latency: latency}, - %{ - tenant: tenant, - mod: mod, - func: func, - target_node: node, - origin_node: node() - } + %{mod: mod, func: func, target_node: node, origin_node: node()} ) response @@ -36,21 +29,36 @@ defmodule Realtime.Rpc do def enhanced_call(node, mod, func, args \\ [], opts \\ []) do timeout = Keyword.get(opts, :timeout, Application.get_env(:realtime, :rpc_timeout)) - with {latency, {status, _} = response} <- + with {latency, response} <- :timer.tc(fn -> :erpc.call(node, mod, func, args, timeout) end) do - Telemetry.execute( - [:realtime, :rpc], - %{latency: latency, success?: status == :ok}, - %{mod: mod, func: func, target_node: node, origin_node: node()} - ) - case response do - {status, _} when status in [:ok, :error] -> response - _ -> {:error, response} + {:ok, _} -> + Telemetry.execute( + [:realtime, :rpc], + %{latency: latency, success?: true}, + %{mod: mod, func: func, target_node: node, origin_node: node()} + ) + + response + + {:error, response} -> + Telemetry.execute( + [:realtime, :rpc], + %{latency: latency, success?: false}, + %{mod: mod, func: func, target_node: node, origin_node: node()} + ) + + {:error, response} end end catch kind, reason -> + Telemetry.execute( + [:realtime, :rpc], + %{latency: 0, success?: false}, + %{mod: mod, func: func, target_node: node, origin_node: node()} + ) + log_error( "ErrorOnRpcCall", %{target: node, mod: mod, func: func, error: {kind, reason}}, @@ -59,6 +67,10 @@ defmodule Realtime.Rpc do target: node ) - {:error, "RPC call error"} + case reason do + {:erpc, :timeout} -> {:error, :rpc_error, :timeout} + {:exception, error, _} -> {:error, :rpc_error, error} + _ -> {:error, reason} + end end end diff --git a/lib/realtime_web/channels/realtime_channel/logging.ex b/lib/realtime_web/channels/realtime_channel/logging.ex index 96e500c76..e2dfad852 100644 --- a/lib/realtime_web/channels/realtime_channel/logging.ex +++ b/lib/realtime_web/channels/realtime_channel/logging.ex @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do """ require Logger import Realtime.Logs + alias Realtime.Telemetry @doc """ Logs messages according to user options given on config @@ -20,6 +21,17 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do socket end + @doc """ + List of errors that are system triggered and not user driven + """ + def system_errors, + do: [ + "UnableToSetPolicies", + "InitializingProjectConnection", + "DatabaseConnectionIssue", + "UnknownErrorOnChannel" + ] + @doc """ Logs errors in an expected format """ @@ -32,6 +44,8 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do def log_error_message(level, code, error, metadata \\ []) def log_error_message(:error, code, error, metadata) do + if code in system_errors(), do: Telemetry.execute([:realtime, :channel, :error], %{code: code}, %{code: code}) + log_error(code, error, metadata) {:error, %{reason: error}} end diff --git a/mix.exs b/mix.exs index 42e7977b7..32a8f1b1d 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.34.29", + version: "2.34.30", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/database_test.exs b/test/realtime/database_test.exs index 2de886d07..d6380cad5 100644 --- a/test/realtime/database_test.exs +++ b/test/realtime/database_test.exs @@ -3,14 +3,15 @@ defmodule Realtime.DatabaseTest do use Realtime.DataCase, async: false import ExUnit.CaptureLog - import Mock alias Realtime.Database doctest Realtime.Database + def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata}) setup do tenant = tenant_fixture() - + :telemetry.attach(__MODULE__, [:realtime, :database, :transaction], &__MODULE__.handle_telemetry/4, pid: self()) + on_exit(fn -> :telemetry.detach(__MODULE__) end) # Ensure no replication slot is present before the test Cleanup.ensure_no_replication_slot() @@ -163,15 +164,13 @@ defmodule Realtime.DatabaseTest do test "with telemetry event defined, emits telemetry event", %{db_conn: db_conn} do event = [:realtime, :database, :transaction] - with_mock Realtime.Telemetry, execute: fn _, _, _ -> :ok end do - Database.transaction( - db_conn, - fn conn -> Postgrex.query!(conn, "SELECT pg_sleep(6)", []) end, - telemetry: event - ) + Database.transaction( + db_conn, + fn conn -> Postgrex.query!(conn, "SELECT pg_sleep(6)", []) end, + telemetry: event + ) - assert_called(Realtime.Telemetry.execute(event, %{latency: :_}, %{})) - end + assert_receive {^event, %{latency: _}} end end diff --git a/test/realtime/monitoring/prom_ex_test.exs b/test/realtime/monitoring/prom_ex_test.exs new file mode 100644 index 000000000..8878fe890 --- /dev/null +++ b/test/realtime/monitoring/prom_ex_test.exs @@ -0,0 +1,19 @@ +defmodule Realtime.PromExTest do + use ExUnit.Case + doctest Realtime.PromEx + alias Realtime.PromEx + + describe "get_metrics/0" do + test "builds metrics in prometheus format which includes host region and id" do + metrics = PromEx.get_metrics() + + assert String.contains?( + metrics, + "# HELP beam_system_schedulers_online_info The number of scheduler threads that are online." + ) + + assert String.contains?(metrics, "# TYPE beam_system_schedulers_online_info gauge") + assert String.contains?(metrics, "beam_system_schedulers_online_info{host=\"nohost\",region=\"\",id=\"nohost\"}") + end + end +end diff --git a/test/realtime/rpc_test.exs b/test/realtime/rpc_test.exs index 88540fcb9..fbec53b40 100644 --- a/test/realtime/rpc_test.exs +++ b/test/realtime/rpc_test.exs @@ -1,30 +1,73 @@ defmodule Realtime.RpcTest do use ExUnit.Case - alias Realtime.Rpc + import ExUnit.CaptureLog + alias Realtime.Rpc + defmodule TestRpc do def test_raise, do: raise("test") - def test_timeout, do: Process.sleep(1000) + def test_timeout, do: Process.sleep(200) def test_success, do: {:ok, "success"} end + def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata}) + + setup do + :telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self()) + on_exit(fn -> :telemetry.detach(__MODULE__) end) + :ok + end + + describe "call/5" do + test "successful RPC call returns exactly what the original function returns" do + assert {:ok, "success"} = Rpc.call(node(), TestRpc, :test_success, []) + assert_receive {[:realtime, :rpc], %{latency: _}} + end + + test "raised exceptions are properly caught and logged" do + assert {:badrpc, + {:EXIT, + {%RuntimeError{message: "test"}, + [ + {Realtime.RpcTest.TestRpc, :test_raise, 0, + [file: ~c"test/realtime/rpc_test.exs", line: 9, error_info: %{module: Exception}]} + ]}}} = + Rpc.call(node(), TestRpc, :test_raise, []) + + assert_receive {[:realtime, :rpc], %{latency: _}} + end + + test "timeouts are properly caught and logged" do + assert {:badrpc, :timeout} = + Rpc.call(node(), TestRpc, :test_timeout, [], timeout: 100) + + assert_receive {[:realtime, :rpc], %{latency: _}} + end + end + describe "enhanced_call/5" do test "successful RPC call returns exactly what the original function returns" do assert {:ok, "success"} = Rpc.enhanced_call(node(), TestRpc, :test_success) + assert_receive {[:realtime, :rpc], %{latency: _, success?: true}} end test "raised exceptions are properly caught and logged" do assert capture_log(fn -> - assert {:error, "RPC call error"} = Rpc.enhanced_call(node(), TestRpc, :test_raise) + assert {:error, :rpc_error, %RuntimeError{message: "test"}} = + Rpc.enhanced_call(node(), TestRpc, :test_raise) end) =~ "ErrorOnRpcCall" + + assert_receive {[:realtime, :rpc], %{latency: _, success?: false}} end test "timeouts are properly caught and logged" do assert capture_log(fn -> - assert {:error, "RPC call error"} = - Rpc.enhanced_call(node(), TestRpc, :test_timeout, 500) + assert {:error, :rpc_error, :timeout} = + Rpc.enhanced_call(node(), TestRpc, :test_timeout, [], timeout: 100) end) =~ "ErrorOnRpcCall" + + assert_receive {[:realtime, :rpc], %{latency: 0, success?: false}} end end end diff --git a/test/realtime_web/channels/realtime_channel/logging_test.exs b/test/realtime_web/channels/realtime_channel/logging_test.exs index a00cd9152..ae9dabd53 100644 --- a/test/realtime_web/channels/realtime_channel/logging_test.exs +++ b/test/realtime_web/channels/realtime_channel/logging_test.exs @@ -4,7 +4,12 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do import ExUnit.CaptureLog alias RealtimeWeb.RealtimeChannel.Logging + def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {event, measures, metadata}) + setup do + :telemetry.attach(__MODULE__, [:realtime, :channel, :error], &__MODULE__.handle_telemetry/4, pid: self()) + on_exit(fn -> :telemetry.detach(__MODULE__) end) + level = Logger.level() Logger.configure(level: :debug) on_exit(fn -> Logger.configure(level: level) end) @@ -38,9 +43,21 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do test "handles error level errors" do assert capture_log(fn -> - result = Logging.log_error_message(:error, :test_code, "test error") + result = Logging.log_error_message(:error, "TestCodeError", "test error") assert {:error, %{reason: "test error"}} = result end) =~ "test error" end + + test "only emits telemetry for system errors" do + errors = Logging.system_errors() + + for error <- errors do + Logging.log_error_message(:error, error, "test error") + assert_receive {[:realtime, :channel, :error], %{code: ^error}, %{code: ^error}} + end + + Logging.log_error_message(:error, "DatabaseConnectionIssue", "test error") + refute_receive {[:realtime, :channel, :error], %{code: "DatabaseConnectionIssue"}, %{code: "UnableToSetPolicies"}} + end end end