Skip to content

Commit

Permalink
[#26119] DocDB: Fix tablet lock order while creating vector index
Browse files Browse the repository at this point in the history
Summary:
In master code we should acquire locks on tablets in the order of increasing ids.
To guarantee that tablets are always locked in the same order to avoid lock order inversion.

When vector index is created locks are acquired in the order of partitions.
Fixed by using correct lock order.
Jira: DB-15446

Test Plan: ./yb_build.sh tsan PgIndexBackfillTest.VectorIndex -n 40

Reviewers: zdrudi

Reviewed By: zdrudi

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D42068
  • Loading branch information
spolitov committed Feb 22, 2025
1 parent d3d0528 commit 0b4622e
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 91 deletions.
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4224,7 +4224,7 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
} else {
// Adding a table to an existing colocation tablet.
if (is_vector_index) {
tablets = VERIFY_RESULT(indexed_table->GetTablets());
tablets = VERIFY_RESULT(indexed_table->GetTablets(GetTabletsMode::kOrderByTabletId));
} else {
auto tablet = tablegroup ?
tablegroup->tablet() :
Expand Down
11 changes: 11 additions & 0 deletions src/yb/util/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,17 @@ T AddFetch(std::atomic<T>* atomic, const D& delta, std::memory_order memory_orde
return atomic->fetch_add(delta, memory_order) + delta;
}

template <class T>
bool MakeAtLeast(std::atomic<T>& atomic, T new_value) {
auto old_value = atomic.load();
while (old_value < new_value) {
if (atomic.compare_exchange_strong(old_value, new_value)) {
return true;
}
}
return false;
}

// ------------------------------------------------------------------------------------------------
// A utility for testing if an atomic is lock-free.

Expand Down
211 changes: 121 additions & 90 deletions src/yb/yql/pgwrapper/pg_index_backfill-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "yb/util/countdown_latch.h"
#include "yb/util/format.h"
#include "yb/util/monotime.h"
#include "yb/util/scope_exit.h"
#include "yb/util/status_format.h"
#include "yb/util/string_util.h"
#include "yb/util/test_thread_holder.h"
Expand Down Expand Up @@ -2572,109 +2573,143 @@ TEST_F_EX(PgIndexBackfillTest, ConcurrentDelete, PgIndexBackfill1kRowsPerSec) {
thread_holder_.JoinAll();
}

TEST_F(PgIndexBackfillTest, VectorIndex) {
constexpr int kBig = 100000000;
struct VectorIndexWriter {
static constexpr int kBig = 100000000;

std::atomic<int> counter = 0;
std::atomic<int> extra_values_counter = kBig * 2;
std::atomic<CoarseTimePoint> last_write;
std::atomic<MonoDelta> max_time_without_inserts = MonoDelta::FromNanoseconds(0);
std::atomic<bool> failure = false;

void Perform(PGConn& conn) {
std::vector<int> values;
for (int i = RandomUniformInt(3, 6); i > 0; --i) {
values.push_back(++counter);
}
size_t keep_values = values.size();
for (int i = RandomUniformInt(0, 2); i > 0; --i) {
values.push_back(++extra_values_counter);
}
bool use_2_steps = RandomUniformBool();

int offset = use_2_steps ? kBig : 0;
ASSERT_NO_FATALS(Insert(conn, values, offset));
if (use_2_steps || keep_values != values.size()) {
ASSERT_NO_FATALS(UpdateAndDelete(conn, values, keep_values));
}
}

void Insert(PGConn& conn, const std::vector<int>& values, int offset) {
for (;;) {
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
bool failed = false;
for (auto value : values) {
auto res = conn.ExecuteFormat(
"INSERT INTO test VALUES ($0, '[$1.0]')", value, value + offset);
if (!res.ok()) {
ASSERT_OK(conn.RollbackTransaction());
LOG(INFO) << "Insert " << value << " failed: " << res;
ASSERT_STR_CONTAINS(res.message().ToBuffer(), "schema version mismatch");
failed = true;
break;
}
}
if (!failed) {
ASSERT_OK(conn.CommitTransaction());
auto now = CoarseMonoClock::Now();
auto prev_last_write = last_write.exchange(now);
if (prev_last_write != CoarseTimePoint()) {
MonoDelta new_value(now - prev_last_write);
if (MakeAtLeast(max_time_without_inserts, new_value)) {
LOG(INFO) << "Update max time without inserts: " << new_value;
}
}
std::this_thread::sleep_for(100ms);
break;
}
}
}

void UpdateAndDelete(PGConn& conn, const std::vector<int>& values, size_t keep_values) {
for (;;) {
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
bool failed = false;
for (size_t i = 0; i != values.size(); ++i) {
auto value = values[i];
Status res;
if (i < keep_values) {
res = conn.ExecuteFormat(
"UPDATE test SET embedding = '[$0.0]' WHERE id = $0", value);
} else {
res = conn.ExecuteFormat("DELETE FROM test WHERE id = $0", value);
}
if (!res.ok()) {
ASSERT_OK(conn.RollbackTransaction());
LOG(INFO) <<
(i < keep_values ? "Update " : "Delete " ) << value << " failed: " << res;
ASSERT_STR_CONTAINS(res.message().ToBuffer(), "schema version mismatch");
failed = true;
break;
}
}
if (!failed) {
ASSERT_OK(conn.CommitTransaction());
std::this_thread::sleep_for(100ms);
break;
}
}
}

void WaitWritten(int num_rows) {
auto limit = counter.load() + num_rows;
while (counter.load() < limit && !failure) {
std::this_thread::sleep_for(10ms);
}
}

void Verify(PGConn& conn) {
for (int i = 2; i < counter.load(); ++i) {
auto rows = ASSERT_RESULT(conn.FetchAllAsString(Format(
"SELECT id FROM test ORDER BY embedding <-> '[$0]' LIMIT 3", i * 1.0 - 0.01)));
ASSERT_EQ(rows, Format("$0; $1; $2", i, i - 1, i + 1));
}
}
};

TEST_F(PgIndexBackfillTest, VectorIndex) {
ASSERT_OK(conn_->Execute("CREATE EXTENSION vector"));
ASSERT_OK(conn_->ExecuteFormat(
"CREATE TABLE test (id INT PRIMARY KEY, embedding vector(1))"));
TestThreadHolder thread_holder;
std::atomic<int> counter = 0;
std::atomic<int> extra_values_counter = kBig * 2;
std::atomic<CoarseTimePoint> last_write;
MonoDelta max_time_without_inserts = MonoDelta::FromNanoseconds(0);
VectorIndexWriter writer;
for (int i = 0; i != 8; ++i) {
thread_holder.AddThreadFunctor(
[this, &stop_flag = thread_holder.stop_flag(), &counter, &last_write, &extra_values_counter,
&max_time_without_inserts] {
[this, &stop_flag = thread_holder.stop_flag(), &writer] {
bool done = false;
auto se = ScopeExit([&done, &writer] {
if (!done) {
writer.failure = true;
}
});
auto conn = ASSERT_RESULT(Connect());
while (!stop_flag.load()) {
std::vector<int> values;
for (int i = RandomUniformInt(1, 4); i-- > 0;) {
values.push_back(++counter);
}
size_t keep_values = values.size();
for (int i = RandomUniformInt(0, 2); i-- > 0;) {
values.push_back(++extra_values_counter);
}
bool use_2_steps = RandomUniformBool();

int offset = use_2_steps ? kBig : 0;
for (;;) {
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
bool failed = false;
for (auto value : values) {
auto res = conn.ExecuteFormat(
"INSERT INTO test VALUES ($0, '[$1.0]')", value, value + offset);
if (!res.ok()) {
ASSERT_OK(conn.RollbackTransaction());
LOG(INFO) << "Insert " << value << " failed: " << res;
ASSERT_STR_CONTAINS(res.message().ToBuffer(), "schema version mismatch");
failed = true;
break;
}
}
if (!failed) {
ASSERT_OK(conn.CommitTransaction());
auto now = CoarseMonoClock::Now();
auto prev_last_write = last_write.exchange(now);
if (prev_last_write != CoarseTimePoint() &&
prev_last_write + max_time_without_inserts < now) {
max_time_without_inserts = now - prev_last_write;
LOG(INFO) << "Update max time without inserts: " << max_time_without_inserts;
}
std::this_thread::sleep_for(100ms);
break;
}
}
if (use_2_steps || keep_values != values.size()) {
for (;;) {
ASSERT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
bool failed = false;
for (size_t i = 0; i != values.size(); ++i) {
auto value = values[i];
Status res;
if (i < keep_values) {
res = conn.ExecuteFormat(
"UPDATE test SET embedding = '[$0.0]' WHERE id = $0", value);
} else {
res = conn.ExecuteFormat("DELETE FROM test WHERE id = $0", value);
}
if (!res.ok()) {
ASSERT_OK(conn.RollbackTransaction());
LOG(INFO) <<
(i < keep_values ? "Update " : "Delete " ) << value << " failed: " << res;
ASSERT_STR_CONTAINS(res.message().ToBuffer(), "schema version mismatch");
failed = true;
break;
}
}
if (!failed) {
ASSERT_OK(conn.CommitTransaction());
std::this_thread::sleep_for(100ms);
break;
}
}
}
ASSERT_NO_FATALS(writer.Perform(conn));
}
done = true;
});
}
while (counter.load() < 32) {
std::this_thread::sleep_for(10ms);
}
writer.WaitWritten(32);
LOG(INFO) << "Started to create index";
// TODO(vector_index) Switch to using CONCURRENT index creation when it will be ready.
ASSERT_OK(conn_->Execute(
"CREATE INDEX NONCONCURRENTLY ON test USING ybhnsw (embedding vector_l2_ops)"));
LOG(INFO) << "Finished to create index";
auto limit = counter.load() + 32;
while (counter.load() < limit) {
std::this_thread::sleep_for(10ms);
}
writer.WaitWritten(32);
thread_holder.Stop();
LOG(INFO) << "Max time without inserts: " << max_time_without_inserts;
ASSERT_LT(max_time_without_inserts, 1s * kBackfillSleepSec);
SCOPED_TRACE(Format("Total rows: $0", counter.load()));
LOG(INFO) << "Max time without inserts: " << writer.max_time_without_inserts;
ASSERT_LT(writer.max_time_without_inserts, 1s * kBackfillSleepSec);
SCOPED_TRACE(Format("Total rows: $0", writer.counter.load()));
for (size_t i = 0; i != cluster_->num_tablet_servers(); ++i) {
tserver::VerifyVectorIndexesRequestPB req;
tserver::VerifyVectorIndexesResponsePB resp;
Expand All @@ -2684,11 +2719,7 @@ TEST_F(PgIndexBackfillTest, VectorIndex) {
ASSERT_OK(proxy->VerifyVectorIndexes(req, &resp, &controller));
ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
}
for (int i = 2; i < counter.load(); ++i) {
auto rows = ASSERT_RESULT(conn_->FetchAllAsString(Format(
"SELECT id FROM test ORDER BY embedding <-> '[$0]' LIMIT 3", i * 1.0 - 0.01)));
ASSERT_EQ(rows, Format("$0; $1; $2", i, i - 1, i + 1));
}
writer.Verify(*conn_);
}

} // namespace yb::pgwrapper

0 comments on commit 0b4622e

Please sign in to comment.