From 7f3d6366556993447a2612c1cab4e20efcf5d3a1 Mon Sep 17 00:00:00 2001 From: Pyry Liukas Date: Thu, 7 Dec 2023 08:33:03 +0200 Subject: [PATCH 1/2] replication origin support --- lib/pgsync/client.rb | 1 + lib/pgsync/data_source.rb | 19 +++++++++++++++++++ lib/pgsync/sync.rb | 2 +- lib/pgsync/table_sync.rb | 4 +++- test/sync_test.rb | 7 +++++++ 5 files changed, 31 insertions(+), 2 deletions(-) diff --git a/lib/pgsync/client.rb b/lib/pgsync/client.rb index 97cd341..2cac594 100644 --- a/lib/pgsync/client.rb +++ b/lib/pgsync/client.rb @@ -101,6 +101,7 @@ def slop_options o.boolean "--in-batches", "sync in batches", default: false, help: false o.integer "--batch-size", "batch size", default: 10000, help: false o.float "--sleep", "time to sleep between batches", default: 0, help: false + o.string "--replication-origin", "replication origin", help: false o.separator "" o.separator "Other commands:" diff --git a/lib/pgsync/data_source.rb b/lib/pgsync/data_source.rb index 5ba8ecc..651cbc0 100644 --- a/lib/pgsync/data_source.rb +++ b/lib/pgsync/data_source.rb @@ -100,6 +100,25 @@ def triggers(table) execute(query, [quote_ident_full(table)]) end + def set_replication_origin(origin) + query = <<~SQL + SELECT + CASE + WHEN EXISTS (SELECT 1 FROM pg_replication_origin WHERE roname = $1) THEN null + ELSE pg_replication_origin_create($1) + END; + SQL + execute(query, [origin]) + query = <<~SQL + SELECT + CASE + WHEN pg_replication_origin_session_is_setup() THEN null + ELSE pg_replication_origin_session_setup($1) + END; + SQL + execute(query, [origin]) + end + def conn @conn ||= begin begin diff --git a/lib/pgsync/sync.rb b/lib/pgsync/sync.rb index f580399..291529a 100644 --- a/lib/pgsync/sync.rb +++ b/lib/pgsync/sync.rb @@ -19,7 +19,7 @@ def perform end # merge other config - [:to_safe, :exclude, :schemas].each do |opt| + [:to_safe, :exclude, :schemas, :replication_origin].each do |opt| opts[opt] ||= config[opt.to_s] end diff --git a/lib/pgsync/table_sync.rb b/lib/pgsync/table_sync.rb index c7a8ec9..6793733 100644 --- a/lib/pgsync/table_sync.rb +++ b/lib/pgsync/table_sync.rb @@ -248,7 +248,9 @@ def run_tasks(tasks, &block) Parallel.each(tasks, **options) do |task| source.reconnect_if_needed destination.reconnect_if_needed - + if opts[:replication_origin] + destination.set_replication_origin(opts[:replication_origin]) + end task.perform end end diff --git a/test/sync_test.rb b/test/sync_test.rb index 0dc1623..edfe7f0 100644 --- a/test/sync_test.rb +++ b/test/sync_test.rb @@ -175,4 +175,11 @@ def test_disable_integrity_v2 assert_equal [], conn2.exec("SELECT * FROM posts ORDER BY id").to_a assert_equal [{"post_id" => 1}], conn2.exec("SELECT post_id FROM comments ORDER BY post_id").to_a end + + def test_replication_origin + insert(conn1, "posts", [{"id" => 1}]) + assert_error "Sync failed for 1 table: posts", "posts", config: true + assert_works "posts --replication-origin=test --debug", config: true + assert_equal [{"exists" => 1}], conn2.exec("SELECT 1 AS exists FROM pg_replication_origin WHERE roname = 'test'").to_a + end end From a6c33c8e038ec8c043e4aa0e855804483f05a5fd Mon Sep 17 00:00:00 2001 From: Pyry Liukas Date: Thu, 7 Dec 2023 08:55:23 +0200 Subject: [PATCH 2/2] test_replication_origin fixed, optimized sql --- lib/pgsync/data_source.rb | 11 ++--------- test/sync_test.rb | 1 - 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/lib/pgsync/data_source.rb b/lib/pgsync/data_source.rb index 651cbc0..d99eff8 100644 --- a/lib/pgsync/data_source.rb +++ b/lib/pgsync/data_source.rb @@ -106,15 +106,8 @@ def set_replication_origin(origin) CASE WHEN EXISTS (SELECT 1 FROM pg_replication_origin WHERE roname = $1) THEN null ELSE pg_replication_origin_create($1) - END; - SQL - execute(query, [origin]) - query = <<~SQL - SELECT - CASE - WHEN pg_replication_origin_session_is_setup() THEN null - ELSE pg_replication_origin_session_setup($1) - END; + END, + pg_replication_origin_session_setup($1); SQL execute(query, [origin]) end diff --git a/test/sync_test.rb b/test/sync_test.rb index edfe7f0..8cc1c79 100644 --- a/test/sync_test.rb +++ b/test/sync_test.rb @@ -178,7 +178,6 @@ def test_disable_integrity_v2 def test_replication_origin insert(conn1, "posts", [{"id" => 1}]) - assert_error "Sync failed for 1 table: posts", "posts", config: true assert_works "posts --replication-origin=test --debug", config: true assert_equal [{"exists" => 1}], conn2.exec("SELECT 1 AS exists FROM pg_replication_origin WHERE roname = 'test'").to_a end