Skip to content

Commit

Permalink
fix: Handle janitor long living connections
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Feb 11, 2025
1 parent c7ea914 commit 05d302e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
17 changes: 12 additions & 5 deletions lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ defmodule Realtime.Tenants.Janitor do
alias Realtime.Tenants
alias Realtime.Tenants.Migrations

@table_name Realtime.Tenants.Connect
@matchspec [{:"$1", [], [:"$1"]}]

@type t :: %__MODULE__{
timer: pos_integer() | nil,
region: String.t() | nil,
Expand Down Expand Up @@ -60,13 +57,23 @@ defmodule Realtime.Tenants.Janitor do
{:ok, state}
end

@table_name Realtime.Tenants.Connect
@syn_table :"syn_registry_by_name_Elixir.Realtime.Tenants.Connect"
@matchspec [{{:"$1"}, [], [:"$1"]}]
@syn_matchspec [
{{:"$1", :"$2", :"$3", :"$4", :"$5", Node.self()}, [], [:"$1"]}
]

@impl true
def handle_info(:delete_old_messages, state) do
Logger.info("Janitor started")
%{chunks: chunks, tasks: tasks} = state
all_tenants = :ets.select(@table_name, @matchspec)
connected_tenants = :ets.select(@syn_table, @syn_matchspec)

new_tasks =
:ets.select(@table_name, @matchspec)
MapSet.new(all_tenants ++ connected_tenants)
|> Enum.to_list()
|> Stream.chunk_every(chunks)
|> Stream.map(fn chunks ->
task =
Expand Down Expand Up @@ -117,7 +124,7 @@ defmodule Realtime.Tenants.Janitor do

defp perform_mantaince_tasks(tenants), do: Enum.map(tenants, &perform_mantaince_task/1)

defp perform_mantaince_task({tenant_external_id}) do
defp perform_mantaince_task(tenant_external_id) do
Logger.metadata(project: tenant_external_id, external_id: tenant_external_id)
Logger.info("Janitor starting realtime.messages cleanup")
:ets.delete(@table_name, tenant_external_id)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.19",
version: "2.34.20",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
44 changes: 43 additions & 1 deletion test/realtime/tenants/janitor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Realtime.Tenants.JanitorTest do
%{tenants: tenants}
end

test "cleans messages older than 72 hours and creates partitions from tenants that were active and untracks the user",
test "cleans messages older than 72 hours and creates partitions from tenants that were active and untracks the user and test tenant is connected",
%{
tenants: tenants
} do
Expand Down Expand Up @@ -91,6 +91,48 @@ defmodule Realtime.Tenants.JanitorTest do
end
end

test "cleans messages older than 72 hours and creates partitions from tenants that were active and untracks the user and test tenant has disconnected",
%{
tenants: tenants
} do
Realtime.Tenants.Connect.shutdown(hd(tenants).external_id)
Process.sleep(100)

utc_now = NaiveDateTime.utc_now()
limit = NaiveDateTime.add(utc_now, -72, :hour)

messages =
for days <- -5..0 do
inserted_at = NaiveDateTime.add(utc_now, days, :day)
Enum.map(tenants, &message_fixture(&1, %{inserted_at: inserted_at}))
end
|> List.flatten()
|> MapSet.new()

to_keep =
messages
|> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt))
|> MapSet.new()

with_mock Migrations, create_partitions: fn _ -> :ok end do
start_supervised!(Janitor)
Process.sleep(500)

current =
Enum.map(tenants, fn tenant ->
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
{:ok, res} = Repo.all(conn, from(m in Message), Message)
res
end)
|> List.flatten()
|> MapSet.new()

assert MapSet.difference(current, to_keep) |> MapSet.size() == 0
assert_called(Migrations.create_partitions(:_))
assert :ets.tab2list(Connect) == []
end
end

test "logs error if fails to connect to tenant" do
extensions = [
%{
Expand Down

0 comments on commit 05d302e

Please sign in to comment.