Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (sync-service): let Postgres handle user-provided root table name #1776

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion packages/sync-service/lib/electric/postgres/inspector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,61 @@ defmodule Electric.Postgres.Inspector do
array_type: String.t()
}

@callback load_relation(String.t(), opts :: term()) ::
{:ok, relation()} | {:error, String.t()}

@callback clean_relation(relation(), opts :: term()) :: true

@callback load_column_info(relation(), opts :: term()) ::
{:ok, [column_info()]} | :table_not_found

@callback clean_column_info(relation(), opts :: term()) :: true

@callback clean(relation(), opts :: term()) :: true

@type inspector :: {module(), opts :: term()}

@doc """
Expects the table name provided by the user
and validates that the table exists and returns the relation.

The table name can be quoted or unquoted and can optionally be qualified,
e.g. `users` would return `{"public", "users"}`,
`usErs` would return `{"public", "users"}`,
`"Users"` would return `{"public", "Users"}`,
`some_schema.users` would return `{"some_schema", "users"}`.
"""
@spec load_relation(String.t(), inspector()) :: {:ok, relation()} | {:error, String.t()}
def load_relation(table, {module, opts}),
do: module.load_relation(table, opts)

@doc """
Clean up relation information about a given table using a provided inspector.
"""
@spec clean_relation(relation(), inspector()) :: true
def clean_relation(rel, {module, opts}),
do: module.clean_relation(rel, opts)

@doc """
Load column information about a given table using a provided inspector.
"""
@spec load_column_info(relation(), inspector()) :: {:ok, [column_info()]} | :table_not_found
def load_column_info(relation, {module, opts}), do: module.load_column_info(relation, opts)
def load_column_info(relation, {module, opts}) do
module.load_column_info(relation, opts)
end

@doc """
Clean up column information about a given table using a provided inspector.
"""
@spec clean_column_info(relation(), inspector()) :: true
def clean_column_info(relation, {module, opts}), do: module.clean_column_info(relation, opts)

@doc """
Clean up all information about a given relation using a provided inspector.
"""
@spec clean(relation(), inspector()) :: true
def clean(relation, {module, opts}), do: module.clean_column_info(relation, opts)

@doc """
Get columns that should be considered a PK for table. If the table
has no PK, then we're considering all columns as identifying.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
defmodule Electric.Postgres.Inspector.DirectInspector do
@behaviour Electric.Postgres.Inspector

@doc """
Returns the PG relation from the table name.
"""
def load_relation(table, conn) do
# The extra cast from $1 to text is needed because of Postgrex' OID type encoding
# see: https://github.com/elixir-ecto/postgrex#oid-type-encoding
query = """
SELECT nspname, relname
FROM pg_class
JOIN pg_namespace ON relnamespace = pg_namespace.oid
WHERE
relkind = 'r' AND
pg_class.oid = $1::text::regclass
"""

case Postgrex.query(conn, query, [table]) do
{:ok, result} ->
# We expect exactly one row because the query didn't fail
# so the relation exists since we could cast it to a regclass
info = Enum.at(result.rows, 0)
{:ok, {Enum.at(info, 0), Enum.at(info, 1)}}

{:error, err} ->
{:error, Exception.message(err)}
end
end

def clean_relation(_, _), do: true

@doc """
Load table information (refs) from the database
"""
Expand Down Expand Up @@ -29,6 +58,7 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
result = Postgrex.query!(conn, query, [tbl, namespace])

if Enum.empty?(result.rows) do
# Fixme: this is not necessarily true. The table might exist but have no columns.
:table_not_found
else
columns = Enum.map(result.columns, &String.to_atom/1)
Expand All @@ -38,4 +68,6 @@ defmodule Electric.Postgres.Inspector.DirectInspector do
end

def clean_column_info(_, _), do: true

def clean(_, _), do: true
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,46 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
@behaviour Electric.Postgres.Inspector

@default_pg_info_table :pg_info_table
@default_pg_relation_table :pg_relation_table

## Public API

def start_link(opts),
do:
GenServer.start_link(
__MODULE__,
Map.new(opts) |> Map.put_new(:pg_info_table, @default_pg_info_table),
Map.new(opts)
|> Map.put_new(:pg_info_table, @default_pg_info_table)
|> Map.put_new(:pg_relation_table, @default_pg_relation_table),
name: Access.get(opts, :name, __MODULE__)
)

@impl Electric.Postgres.Inspector
def load_relation(table, opts) do
case relation_from_ets(table, opts) do
:not_found ->
GenServer.call(opts[:server], {:load_relation, table})

rel ->
{:ok, rel}
end
end

@impl Electric.Postgres.Inspector
def clean_relation(rel, opts_or_state) do
pg_relation_ets_table =
Access.get(opts_or_state, :pg_relation_table, @default_pg_relation_table)

pg_info_ets_table = Access.get(opts_or_state, :pg_info_table, @default_pg_info_table)

# Delete all tables that are associated with the relation
tables_from_ets(rel, opts_or_state)
|> Enum.each(fn table -> :ets.delete(pg_info_ets_table, {table, :table_to_relation}) end)

# Delete the relation itself
:ets.delete(pg_relation_ets_table, {rel, :relation_to_table})
end

@impl Electric.Postgres.Inspector
def load_column_info({_namespace, _table_name} = table, opts) do
case column_info_from_ets(table, opts) do
Expand All @@ -36,20 +65,63 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
:ets.delete(ets_table, {table, :columns})
end

@impl Electric.Postgres.Inspector
def clean(relation, opts_or_state) do
clean_column_info(relation, opts_or_state)
clean_relation(relation, opts_or_state)
end

## Internal API

@impl GenServer
def init(opts) do
pg_info_table = :ets.new(opts.pg_info_table, [:named_table, :public, :set])
pg_relation_table = :ets.new(opts.pg_relation_table, [:named_table, :public, :bag])

state = %{
pg_info_table: pg_info_table,
pg_relation_table: pg_relation_table,
pg_pool: opts.pool
}

{:ok, state}
end

@impl GenServer
def handle_call({:load_relation, table}, _from, state) do
# This serves as a write-through cache for caching
# the namespace and tablename as they occur in PG.
# Note that if users create shapes for the same table but spelled differently,
# e.g. `~s|public.users|`, `~s|users|`, `~s|Users|`, and `~s|USERS|`
# then there will be 4 entries in the cache each of which maps to `{~s|public|, ~s|users|}`.
# If they create a shape for a different table `~s|"Users"|`, then there will be another entry
# in ETS for `~s|"Users"|` that maps to `{~s|public|, ~s|"Users"|}`.
case relation_from_ets(table, state) do
:not_found ->
case DirectInspector.load_relation(table, state.pg_pool) do
{:error, err} ->
{:reply, {:error, err}, state}

{:ok, relation} ->
# We keep the mapping in both directions:
# - Forward: user-provided table name -> PG relation (many-to-one)
# e.g. `~s|users|` -> `{"public", "users"}`
# `~s|USERS|` -> `{"public", "users"}`
# - Backward: and PG relation -> user-provided table names (one-to-many)
# e.g. `{"public", "users"}` -> `[~s|users|, ~s|USERS|]`
#
# The forward direction allows for efficient lookup (based on user-provided table name)
# the backward direction allows for efficient cleanup (based on PG relation)
:ets.insert(state.pg_info_table, {{table, :table_to_relation}, relation})
:ets.insert(state.pg_relation_table, {{relation, :relation_to_table}, table})
{:reply, {:ok, relation}, state}
end

relation ->
{:reply, {:ok, relation}, state}
end
end

@impl GenServer
def handle_call({:load_column_info, table}, _from, state) do
case column_info_from_ets(table, state) do
Expand All @@ -71,6 +143,21 @@ defmodule Electric.Postgres.Inspector.EtsInspector do
e -> {:reply, {:error, e, __STACKTRACE__}, state}
end

@pg_rel_position 2
defp relation_from_ets(table, opts_or_state) do
ets_table = Access.get(opts_or_state, :pg_info_table, @default_pg_info_table)

:ets.lookup_element(ets_table, {table, :table_to_relation}, @pg_rel_position, :not_found)
end

@pg_table_idx 1
defp tables_from_ets(relation, opts_or_state) do
ets_table = Access.get(opts_or_state, :pg_relation_table, @default_pg_relation_table)

:ets.lookup(ets_table, {relation, :relation_to_table})
|> Enum.map(&elem(&1, @pg_table_idx))
end

@column_info_position 2
defp column_info_from_ets(table, opts_or_state) do
ets_table = Access.get(opts_or_state, :pg_info_table, @default_pg_info_table)
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ defmodule Electric.ShapeCache do
# because the table name may have changed in the new relation
# if there is no old relation, we use the new one
rel = old_rel || relation
inspector.clean_column_info({rel.schema, rel.table}, inspector_opts)
inspector.clean({rel.schema, rel.table}, inspector_opts)
end

{:noreply, [], state}
Expand Down
38 changes: 19 additions & 19 deletions packages/sync-service/lib/electric/shapes/shape.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Electric.Shapes.Shape do
alias Electric.Replication.Eval.Parser
alias Electric.Replication.Eval.Runner
alias Electric.Replication.Changes
alias Electric.Utils

@enforce_keys [:root_table]
defstruct [:root_table, :table_info, :where]
Expand Down Expand Up @@ -64,8 +63,9 @@ defmodule Electric.Shapes.Shape do
def new(table, opts) do
opts = NimbleOptions.validate!(opts, @shape_schema)

with {:ok, table} <- validate_table(table),
{:ok, column_info, pk_cols} <- load_column_info(table, Access.fetch!(opts, :inspector)),
with inspector <- Access.fetch!(opts, :inspector),
{:ok, table} <- validate_table(table, inspector),
{:ok, column_info, pk_cols} <- load_column_info(table, inspector),
refs = Inspector.columns_to_expr(column_info),
{:ok, where} <- maybe_parse_where_clause(Access.get(opts, :where), refs) do
{:ok,
Expand Down Expand Up @@ -97,22 +97,22 @@ defmodule Electric.Shapes.Shape do
end
end

defp validate_table(definition) when is_binary(definition) do
regex =
~r/^((?<schema>([a-z_][a-zA-Z0-9_]*|"(""|[^"])+"))\.)?(?<table>([a-z_][a-zA-Z0-9_]*|"(""|[^"])+"))$/

case Regex.run(regex, definition, capture: :all_names) do
["", table_name] when table_name != "" ->
table_name = Utils.parse_quoted_name(table_name)
{:ok, {"public", table_name}}

[schema_name, table_name] when table_name != "" ->
schema_name = Utils.parse_quoted_name(schema_name)
table_name = Utils.parse_quoted_name(table_name)
{:ok, {schema_name, table_name}}

_ ->
{:error, ["table name does not match expected format"]}
defp validate_table(table, inspector) when is_binary(table) do
case Inspector.load_relation(table, inspector) do
{:error, err} ->
case Regex.run(~r/.+ relation "(?<name>.+)" does not exist/, err, capture: :all_names) do
[table_name] ->
{:error,
[
~s|Table "#{table_name}" does not exist. If the table name contains capitals or special characters you must quote it.|
]}

_ ->
{:error, [err]}
end

{:ok, rel} ->
{:ok, rel}
end
end

Expand Down
7 changes: 7 additions & 0 deletions packages/sync-service/lib/electric/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ defmodule Electric.Utils do

@doc """
Parses quoted names.
Lowercases unquoted names to match Postgres' case insensitivity.

## Examples
iex> parse_quoted_name("foo")
Expand All @@ -211,6 +212,12 @@ defmodule Electric.Utils do

iex> parse_quoted_name(~S|"fo""o"|)
~S|fo"o|

iex> parse_quoted_name(~S|"FooBar"|)
~S|FooBar|

iex> parse_quoted_name(~S|FooBar|)
~S|FooBar|
"""
def parse_quoted_name(str) do
if String.first(str) == ~s(") && String.last(str) == ~s(") do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ defmodule Electric.Plug.DeleteShapePlugTest do
def load_column_info({"public", "users"}, _),
do: {:ok, @test_shape.table_info[{"public", "users"}][:columns]}

def load_relation(tbl, _),
do: Support.StubInspector.load_relation(tbl, nil)

setup do
start_link_supervised!({Registry, keys: :duplicate, name: @registry})
:ok
Expand Down Expand Up @@ -70,7 +73,9 @@ defmodule Electric.Plug.DeleteShapePlugTest do
assert conn.status == 400

assert Jason.decode!(conn.resp_body) == %{
"root_table" => ["table name does not match expected format"]
"root_table" => [
"invalid name syntax"
]
}
end

Expand Down
6 changes: 5 additions & 1 deletion packages/sync-service/test/electric/plug/router_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ defmodule Electric.Plug.RouterTest do

assert %{status: 400} = conn

assert %{"root_table" => ["table not found"]} = Jason.decode!(conn.resp_body)
assert %{
"root_table" => [
~s|Table "nonexistent" does not exist. If the table name contains capitals or special characters you must quote it.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice error message!

]
} = Jason.decode!(conn.resp_body)
end

@tag additional_fields: "num INTEGER NOT NULL"
Expand Down
Loading
Loading