Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test with DataWriter (part 2) #376

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sharding/test/trivial_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func setupSingleTableDatabase(f *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) {
testhelpers.SeedInitialData(sourceDB, "gftest", "table1", 100)
testhelpers.SeedInitialData(sourceDB, "gftest", "table1", 1000)
testhelpers.SeedInitialData(targetDB, "gftest", "table1", 0)

testhelpers.AddTenantID(sourceDB, "gftest", "table1", 3)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestSelectiveCopyDataWithoutAnyWritesToSource(t *testing.T) {
assert.Equal(t, 0, count)

rows := testcase.AssertQueriesHaveEqualResult("SELECT * FROM gftest.table1 WHERE tenant_id = 2")
assert.Equal(t, 33, len(rows))
assert.Equal(t, 333, len(rows))
}

func TestSelectiveCopyDataWithInsertLoadOnOtherTenants(t *testing.T) {
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestSelectiveCopyDataWithInsertLoadOnOtherTenants(t *testing.T) {
assert.Equal(t, 0, count)

rows := testcase.AssertQueriesHaveEqualResult("SELECT * FROM gftest.table1 WHERE tenant_id = 2")
assert.Equal(t, 33, len(rows))
assert.Equal(t, 333, len(rows))
}

func TestSelectiveCopyDataWithInsertLoadOnAllTenants(t *testing.T) {
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestSelectiveCopyDataWithInsertLoadOnAllTenants(t *testing.T) {
assert.Equal(t, 0, count)

rows := testcase.AssertQueriesHaveEqualResult("SELECT * FROM gftest.table1 WHERE tenant_id = 2")
assert.True(t, len(rows) > 33)
assert.True(t, len(rows) > 333)
}

type ChangeShardingKeyDataWriter struct {
Expand Down
53 changes: 31 additions & 22 deletions test/helpers/data_writer_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def initialize(db_config,
@started = false
@stop_requested = false

@start_cmd = Queue.new
start_synchronized_datawriter_threads

@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
Expand All @@ -54,29 +57,9 @@ def initialize(db_config,

def start(&on_write)
raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started
@started = true
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("starting data writer thread #{i}")

n = 0
begin
connection = Mysql2::Client.new(@db_config)

until @stop_requested do
write_data(connection, &on_write)
# Kind of makes the following race condition a bit better...
# https://github.com/Shopify/ghostferry/issues/280
sleep(0.03) if n > 10
n += 1
end
ensure
connection.close
end

@logger.info("stopped data writer thread #{i} with a total of #{n} data writes")
end
end
@number_of_writers.times { @start_cmd << on_write }
@started = true
end

def stop_and_join
Expand Down Expand Up @@ -143,5 +126,31 @@ def random_real_id(connection, table)
raise "No rows in the database?" if result.first.nil?
result.first["id"]
end

private

def start_synchronized_datawriter_threads
@number_of_writers.times do |i|
@threads << Thread.new do
connection = Mysql2::Client.new(@db_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still close the connection after the stop is requested?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I forgot to add it back here after an experiment with the ConnectionPool gem (but thought it's a an overkill for this test setup)

@logger.info("data writer thread in wait mode #{i}")
on_write = @start_cmd.pop
@logger.info("starting data writer thread #{i}")

n = 0
until @stop_requested do
write_data(connection, &on_write)
n += 1
# Kind of makes the following race condition a bit better...
# https://github.com/Shopify/ghostferry/issues/280
sleep(0.03)
end

@logger.info("stopped data writer thread #{i} with a total of #{n} data writes")
ensure
connection&.close
end
end
end
end
end
2 changes: 1 addition & 1 deletion test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def test_interrupt_resume_inline_verifier_with_datawriter
batches_written = 0
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1
if batches_written >= 2
if batches_written >= 5
ghostferry.term_and_wait_for_exit
end
end
Expand Down
3 changes: 1 addition & 2 deletions testhelpers/data_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"math/rand"
"sync"

sql "github.com/Shopify/ghostferry/sqlwrapper"

sq "github.com/Masterminds/squirrel"
sql "github.com/Shopify/ghostferry/sqlwrapper"
)

var dataletters = []rune("abcdefghijklmnopqrstuvwxyz")
Expand Down
Loading