Skip to content

Commit

Permalink
fix: refactor presence handler and fix tracking reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Feb 11, 2025
1 parent b4d89d5 commit 1b5adbb
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 168 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
**Environment Variables**

| Variable | Type | Description |
| ----------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| ------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| PORT | number | Port which you can connect your client/listeners |
| DB_HOST | string | Database host URL |
| DB_PORT | number | Database port |
Expand Down Expand Up @@ -159,7 +159,9 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| JANITOR_CLEANUP_CHILDREN_TIMEOUT | number | Timeout for each async task for janitor cleanup |
| JANITOR_CHUNK_SIZE | number | Number of tenants to process per chunk. Each chunk will be processed by a Task |
| METRICS_CLEANER_SCHEDULE_TIMER_IN_MS | number | Time in ms to run the Metric Cleaner task |

## WebSocket URL

The WebSocket URL is in the following format for local development: `ws://[external_id].localhost:4000/socket/websocket`

If you're using Supabase's hosted Realtime in production the URL is `wss://[project-ref].supabase.co/realtime/v1/websocket?apikey=[anon-token]&log_level=info&vsn=1.0.0"`
Expand Down Expand Up @@ -225,6 +227,7 @@ This is the list of operational codes that can help you understand your deployme
| UnableToFindCounter | Error when trying to find a counter to track rate limits for a tenant |
| UnhandledProcessMessage | Unhandled message received by a Realtime process |
| UnableToSetPolicies | We were not able to set policies for this connection |
| UnableToTrackPresence | Error when handling track presence for this socket |
| IncreaseConnectionPool | The number of connections you have set for Realtime are not enough to handle your current use case |
| RlsPolicyError | Error on RLS policy used for authorization |
| ConnectionInitializing | Database is initializing connection |
Expand Down
86 changes: 26 additions & 60 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

@impl true
def handle_info(
_any,
%{
Expand All @@ -210,58 +211,37 @@ defmodule RealtimeWeb.RealtimeChannel do
shutdown_response(socket, message)
end

@impl true

def handle_info(:sync_presence = msg, socket) do
PresenceHandler.track(msg, socket)
end

@impl true
def handle_info(%{event: "postgres_cdc_rls_down"}, socket) do
pg_sub_ref = postgres_subscribe()

{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

@impl true
def handle_info(%{event: "postgres_cdc_down"}, socket) do
pg_sub_ref = postgres_subscribe()

{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

@impl true
def handle_info(
%{event: type, payload: payload} = msg,
%{assigns: %{policies: policies}} = socket
%{event: "presence_diff"},
%{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket
) do
socket =
cond do
type == "presence_diff" and
match?(%Policies{broadcast: %PresencePolicies{read: false}}, policies) ->
Logger.warning("Presence message ignored")

socket

type != "presence_diff" and
match?(%Policies{broadcast: %BroadcastPolicies{read: false}}, policies) ->
Logger.warning("Broadcast message ignored")

socket

true ->
socket
|> count()
|> Logging.maybe_log_handle_info(msg)
Logger.warning("Presence message ignored")
{:noreply, socket}
end

push(socket, type, payload)
socket
end
def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
Logger.warning("Broadcast message ignored")
{:noreply, socket}
end

def handle_info(%{event: type, payload: payload} = msg, socket) do
socket = socket |> count() |> Logging.maybe_log_handle_info(msg)
push(socket, type, payload)
{:noreply, socket}
end

@impl true
def handle_info(:postgres_subscribe, %{assigns: %{channel_name: channel_name}} = socket) do
%{
assigns: %{
Expand Down Expand Up @@ -308,7 +288,6 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end

@impl true
def handle_info(:confirm_token, %{assigns: %{pg_change_params: pg_change_params}} = socket) do
case confirm_token(socket) do
{:ok, claims, confirm_token_ref, _, _} ->
Expand All @@ -332,48 +311,39 @@ defmodule RealtimeWeb.RealtimeChannel do
{:stop, :shutdown, socket}
end

def handle_info(:sync_presence = msg, socket) do
PresenceHandler.call(msg, socket)
end

def handle_info(msg, socket) do
log_error("UnhandledSystemMessage", msg)
{:noreply, socket}
end

@impl true
def handle_in(
_,
_,
%{
assigns: %{
rate_counter: %{avg: avg},
limits: %{max_events_per_second: max}
}
} = socket
)

def handle_in("broadcast", payload, socket), do: BroadcastHandler.call(payload, socket)

def handle_in("presence", payload, socket), do: PresenceHandler.call(payload, socket)

def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket)
when avg > max do
message = "Too many messages per second"

shutdown_response(socket, message)
end

def handle_in(
"access_token",
%{"access_token" => refresh_token},
%{assigns: %{access_token: access_token}} = socket
)
def handle_in("access_token", %{"access_token" => refresh_token}, %{assigns: %{access_token: access_token}} = socket)
when refresh_token == access_token do
{:noreply, socket}
end

def handle_in(
"access_token",
%{"access_token" => refresh_token},
%{assigns: %{access_token: _access_token}} = socket
)
def handle_in("access_token", %{"access_token" => refresh_token}, %{assigns: %{access_token: _access_token}} = socket)
when is_nil(refresh_token) do
{:noreply, socket}
end

def handle_in("access_token", %{"access_token" => refresh_token}, socket)
when is_binary(refresh_token) do
def handle_in("access_token", %{"access_token" => refresh_token}, socket) when is_binary(refresh_token) do
%{
assigns: %{
access_token: access_token,
Expand Down Expand Up @@ -426,10 +396,6 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

def handle_in("broadcast", payload, socket), do: BroadcastHandler.call(payload, socket)

def handle_in("presence", payload, socket), do: PresenceHandler.call(payload, socket)

def handle_in(type, payload, socket) do
socket = count(socket)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
{:ok, socket} ->
{:noreply, socket}

{:error, :increase_connection_pool} ->
log_error("IncreaseConnectionPool", "Please increase your connection pool size")
{:noreply, socket}

{:error, error} ->
log_error("UnableToSetPolicies", error)
{:noreply, socket}
Expand Down
112 changes: 69 additions & 43 deletions lib/realtime_web/channels/realtime_channel/presence_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do

import Phoenix.Socket, only: [assign: 3]
import Phoenix.Channel, only: [push: 3]
import Realtime.Logs

alias Phoenix.Socket
alias Phoenix.Tracker.Shard
Expand All @@ -17,23 +18,28 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
alias RealtimeWeb.Presence
alias RealtimeWeb.RealtimeChannel.Logging

@spec call(map(), Phoenix.Socket.t()) ::
@spec call(map() | :sync_presence, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} | {:reply, :error | :ok, Phoenix.Socket.t()}
def call(
%{"event" => event} = payload,
%{assigns: %{is_new_api: true, presence_key: _, tenant_topic: _}} = socket
) do
socket = count(socket)
{result, socket} = handle_presence_event(event, payload, socket)
def call(%{"event" => event} = payload, socket) do
event = String.downcase(event)

case handle_presence_event(event, payload, socket) do
{:ok, socket} ->
{:noreply, socket}

{:reply, result, socket}
{:error, socket} ->
{:noreply, socket}
end
end

def call(_payload, socket) do
def call(:sync_presence, %{assigns: %{private?: false}} = socket) do
%{assigns: %{tenant_topic: topic}} = socket
socket = count(socket)
push(socket, "presence_state", presence_dirty_list(topic))
{:noreply, socket}
end

def track(msg, %{assigns: assigns} = socket) do
def call(:sync_presence, %{assigns: assigns} = socket) do
%{tenant_topic: topic, policies: policies} = assigns

case policies do
Expand All @@ -42,54 +48,74 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
{:noreply, socket}

_ ->
socket = Logging.maybe_log_handle_info(socket, msg)
socket = Logging.maybe_log_handle_info(socket, :sync_presence)
push(socket, "presence_state", presence_dirty_list(topic))

{:noreply, socket}
end
end

defp handle_presence_event(event, payload, socket) do
%{
assigns: %{
presence_key: presence_key,
tenant_topic: tenant_topic
}
} = socket
def call(_payload, socket), do: {:noreply, socket}

authorization_context = socket.assigns.authorization_context
db_conn = socket.assigns.db_conn
defp handle_presence_event("track", payload, %{assigns: %{private?: false}} = socket) do
track(socket, payload)
end

{:ok, socket} = run_authorization_check(socket, db_conn, authorization_context)
defp handle_presence_event(
"track",
payload,
%{assigns: %{private?: true, policies: %Policies{presence: %PresencePolicies{write: nil}}}} = socket
) do
%{assigns: %{db_conn: db_conn, authorization_context: authorization_context}} = socket

%{assigns: %{policies: policies}} = socket
case run_authorization_check(socket, db_conn, authorization_context) do
{:ok, socket} ->
handle_presence_event("track", payload, socket)

cond do
match?(%Policies{presence: %PresencePolicies{write: false}}, policies) ->
Logger.info("Presence message ignored on #{tenant_topic}")
{:ok, socket}
{:error, error} ->
log_error("UnableToSetPolicies", error)
{:error, socket}
end
end

String.downcase(event) == "track" ->
payload = Map.get(payload, "payload", %{})
defp handle_presence_event(
"track",
payload,
%{assigns: %{private?: true, policies: %Policies{presence: %PresencePolicies{write: true}}}} = socket
) do
track(socket, payload)
end

case Presence.track(self(), tenant_topic, presence_key, payload) do
{:ok, _} ->
{:ok, socket}
defp handle_presence_event(
"track",
_,
%{assigns: %{private?: true, policies: %Policies{presence: %PresencePolicies{write: false}}}} = socket
) do
{:error, socket}
end

{:error, {:already_tracked, _, _, _}} ->
case Presence.update(self(), tenant_topic, presence_key, payload) do
{:ok, _} -> {:ok, socket}
{:error, _} -> {:error, socket}
end
defp handle_presence_event("untrack", _, socket) do
%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
{Presence.untrack(self(), tenant_topic, presence_key), socket}
end

{:error, _} ->
{:error, socket}
end
defp track(socket, payload) do
%{assigns: %{presence_key: presence_key, tenant_topic: tenant_topic}} = socket
socket = count(socket)
payload = Map.get(payload, "payload", %{})

String.downcase(event) == "untrack" ->
{Presence.untrack(self(), tenant_topic, presence_key), socket}
case Presence.track(self(), tenant_topic, presence_key, payload) do
{:ok, _} ->
{:ok, socket}

{:error, {:already_tracked, pid, _, _}} ->
case Presence.update(pid, tenant_topic, presence_key, payload) do
{:ok, _} -> {:ok, socket}
{:error, _} -> {:error, socket}
end

true ->
{:error, error} ->
log_error("UnableToTrackPresence", error)
{:error, socket}
end
end
Expand All @@ -111,7 +137,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
end

defp run_authorization_check(
%Socket{assigns: %{policies: %{presence: %PresencePolicies{write: nil}}}} = socket,
%Socket{assigns: %{private?: true, policies: %{presence: %PresencePolicies{write: nil}}}} = socket,
db_conn,
authorization_context
) do
Expand Down
Loading

0 comments on commit 1b5adbb

Please sign in to comment.