Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication origin support #204

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/pgsync/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def slop_options
o.boolean "--in-batches", "sync in batches", default: false, help: false
o.integer "--batch-size", "batch size", default: 10000, help: false
o.float "--sleep", "time to sleep between batches", default: 0, help: false
o.string "--replication-origin", "replication origin", help: false

o.separator ""
o.separator "Other commands:"
Expand Down
12 changes: 12 additions & 0 deletions lib/pgsync/data_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ def triggers(table)
execute(query, [quote_ident_full(table)])
end

def set_replication_origin(origin)
query = <<~SQL
SELECT
CASE
WHEN EXISTS (SELECT 1 FROM pg_replication_origin WHERE roname = $1) THEN null
ELSE pg_replication_origin_create($1)
END,
pg_replication_origin_session_setup($1);
SQL
execute(query, [origin])
end

def conn
@conn ||= begin
begin
Expand Down
2 changes: 1 addition & 1 deletion lib/pgsync/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def perform
end

# merge other config
[:to_safe, :exclude, :schemas].each do |opt|
[:to_safe, :exclude, :schemas, :replication_origin].each do |opt|
opts[opt] ||= config[opt.to_s]
end

Expand Down
4 changes: 3 additions & 1 deletion lib/pgsync/table_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ def run_tasks(tasks, &block)
Parallel.each(tasks, **options) do |task|
source.reconnect_if_needed
destination.reconnect_if_needed

if opts[:replication_origin]
destination.set_replication_origin(opts[:replication_origin])
end
task.perform
end
end
Expand Down
6 changes: 6 additions & 0 deletions test/sync_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,10 @@ def test_disable_integrity_v2
assert_equal [], conn2.exec("SELECT * FROM posts ORDER BY id").to_a
assert_equal [{"post_id" => 1}], conn2.exec("SELECT post_id FROM comments ORDER BY post_id").to_a
end

def test_replication_origin
insert(conn1, "posts", [{"id" => 1}])
assert_works "posts --replication-origin=test --debug", config: true
assert_equal [{"exists" => 1}], conn2.exec("SELECT 1 AS exists FROM pg_replication_origin WHERE roname = 'test'").to_a
end
end
Loading