Skip to content

Commit

Permalink
[#25431] YSQL: Move FK cache into separate file
Browse files Browse the repository at this point in the history
Summary:
This diff perfroms refactoring only, no logic are affected by the change.

The following code redesign changes are made:
- All independent fields in `PgSession` class related to FK optimization (`fk_reference_cache_`, `fk_reference_intent_`, `fk_intent_region_local_tables_`) are placed into newly created class named `PgFKReferenceCache`
- The instance of `PgFKReferenceCache` is placed in `PgApiImpl` instead of `PgSession` because `PgSession` does nothing with FK cache.
- Two tightly coupled entities `TableYbctidVectorProvider` and `YbctidReader` are substituted with single class named `YbctidReaderProvider`
- The instance of `ExplicitRowLockBuffer` is moved from `PgSession` into `PgSession` because `PgSession` does nothing with it.
Jira: DB-14665

Test Plan: Jenkins

Reviewers: pjain, myang, kramanathan

Reviewed By: myang

Subscribers: yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D42177
  • Loading branch information
d-uspenskiy committed Feb 27, 2025
1 parent da482e2 commit 9fc2912
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 205 deletions.
2 changes: 1 addition & 1 deletion src/postgres/src/backend/tcop/postgres.c
Original file line number Diff line number Diff line change
Expand Up @@ -5147,7 +5147,7 @@ yb_restart_current_stmt(int attempt, bool is_read_restart)
GetCurrentSubTransactionId(), GetCurrentSubTransactionId() + 1);

/*
* TODO(Piyush): Perform pg_session_->InvalidateForeignKeyReferenceCache()
* TODO(Piyush): Perform foreign key reference cacahe cleanup
* and create tests that would fail without this.
*/

Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/pggate/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ set(PGGATE_SRCS
pg_doc_op.cc
pg_explicit_row_lock_buffer.cc
pg_expr.cc
pg_fk_reference_cache.cc
pg_function.cc
pg_function_helpers.cc
pggate.cc
Expand Down
26 changes: 11 additions & 15 deletions src/yb/yql/pggate/pg_explicit_row_lock_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@
#include "yb/common/pgsql_error.h"
#include "yb/common/transaction_error.h"


#include "yb/util/scope_exit.h"

#include "yb/yql/pggate/util/ybc_util.h"

namespace yb::pggate {

ExplicitRowLockBuffer::ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader)
: ybctid_container_provider_(ybctid_container_provider), ybctid_reader_(ybctid_reader) {
}
ExplicitRowLockBuffer::ExplicitRowLockBuffer(YbctidReaderProvider& reader_provider)
: reader_provider_(reader_provider) {}

Status ExplicitRowLockBuffer::Add(
const Info& info, const LightweightTableYbctid& key, bool is_region_local,
Expand Down Expand Up @@ -71,23 +67,23 @@ Status ExplicitRowLockBuffer::DoFlush(std::optional<ErrorStatusAdditionalInfo>&
}

Status ExplicitRowLockBuffer::DoFlushImpl() {
auto ybctids = ybctid_container_provider_.Get();
const auto initial_intents_size = intents_.size();
ybctids->reserve(initial_intents_size);
const auto intents_count = intents_.size();
auto reader = reader_provider_();
reader.Reserve(intents_count);
for (auto it = intents_.begin(); it != intents_.end();) {
auto node = intents_.extract(it++);
ybctids->push_back(std::move(node.value()));
reader.Add(std::move(node.value()));
}
RETURN_NOT_OK(ybctid_reader_(
info_->database_id, ybctids, region_local_tables_,
const auto existing_ybctids_count = VERIFY_RESULT(reader.Read(
info_->database_id, region_local_tables_,
make_lw_function(
[&info = *info_](YbcPgExecParameters& params) {
params.rowmark = info.rowmark;
params.pg_wait_policy = info.pg_wait_policy;
params.docdb_wait_policy = info.docdb_wait_policy;
})));
SCHECK_EQ(ybctids->size(), initial_intents_size, NotFound,
"Some of the requested ybctids are missing");
}))).size();
SCHECK_EQ(
existing_ybctids_count, intents_count, NotFound, "Some of the requested ybctids are missing");
return Status::OK();
}

Expand Down
9 changes: 3 additions & 6 deletions src/yb/yql/pggate/pg_explicit_row_lock_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

#pragma once

#include <functional>
#include <optional>

#include "yb/gutil/macros.h"
Expand Down Expand Up @@ -43,9 +42,8 @@ class ExplicitRowLockBuffer {
PgOid conflicting_table_id;
};

ExplicitRowLockBuffer(
TableYbctidVectorProvider& ybctid_container_provider,
std::reference_wrapper<const YbctidReader> ybctid_reader);
explicit ExplicitRowLockBuffer(YbctidReaderProvider& reader_provider);

Status Add(
const Info& info, const LightweightTableYbctid& key, bool is_region_local,
std::optional<ErrorStatusAdditionalInfo>& error_info);
Expand All @@ -57,8 +55,7 @@ class ExplicitRowLockBuffer {
Status DoFlush(std::optional<ErrorStatusAdditionalInfo>& error_info);
Status DoFlushImpl();

TableYbctidVectorProvider& ybctid_container_provider_;
const YbctidReader& ybctid_reader_;
YbctidReaderProvider& reader_provider_;
TableYbctidSet intents_;
OidSet region_local_tables_;
std::optional<Info> info_;
Expand Down
149 changes: 149 additions & 0 deletions src/yb/yql/pggate/pg_fk_reference_cache.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/yql/pggate/pg_fk_reference_cache.h"

#include <algorithm>
#include <string>
#include <utility>

#include "yb/gutil/port.h"
#include "yb/util/flags/flag_tags.h"
#include "yb/util/logging.h"

#include "yb/yql/pggate/pg_tools.h"

DEFINE_test_flag(bool, ysql_ignore_add_fk_reference, false,
"Don't fill YSQL's internal cache for FK check to force read row from a table");

namespace yb::pggate {

namespace {

template<class Container, class Key>
void Erase(Container& container, const Key& key) {
const auto it = container.find(key);
if (it != container.end()) {
container.erase(it);
}
}

} // namespace

class PgFKReferenceCache::Impl {
public:
Impl(YbctidReaderProvider& reader_provider, const BufferingSettings& buffering_settings)
: reader_provider_(reader_provider), buffering_settings_(buffering_settings) {}

void Clear() {
references_.clear();
intents_.clear();
region_local_tables_.clear();
}

void DeleteReference(const LightweightTableYbctid& key) {
Erase(references_, key);
}

void AddReference(const LightweightTableYbctid& key) {
if (!references_.contains(key) && PREDICT_TRUE(!FLAGS_TEST_ysql_ignore_add_fk_reference)) {
references_.emplace(key.table_id, key.ybctid);
}
}

Result<bool> IsReferenceExists(PgOid database_id, const LightweightTableYbctid& key) {
if (references_.contains(key)) {
return true;
}

// Check existence of required FK intent.
// Absence means the key was checked by previous batched request and was not found.
// We don't need to call the reader in this case.
auto it = intents_.find(key);
if (it == intents_.end()) {
return false;
}

auto reader = reader_provider_();
auto available_capacity = std::min<size_t>(
intents_.size(), buffering_settings_.max_batch_size);
reader.Reserve(available_capacity);
// If the reader fails to get the result, we fail the whole operation (and transaction).
// Hence it's ok to extract (erase) the keys from intent before calling reader.
reader.Add(std::move(intents_.extract(it).value()));
--available_capacity;

for (auto it = intents_.begin();
it != intents_.end() && available_capacity; --available_capacity) {
reader.Add(std::move(intents_.extract(it++).value()));
}

// Add the keys found in docdb to the FK cache.
auto ybctids = VERIFY_RESULT(reader.Read(
database_id, region_local_tables_,
make_lw_function([](YbcPgExecParameters& params) { params.rowmark = ROW_MARK_KEYSHARE; })));
for (const auto& ybctid : ybctids) {
references_.emplace(ybctid.table_id, ybctid.ybctid);
}
return references_.contains(key);
}

void AddIntent(const LightweightTableYbctid& key, bool is_region_local) {
if (references_.contains(key)) {
return;
}

if (is_region_local) {
region_local_tables_.insert(key.table_id);
}
DCHECK(is_region_local || !region_local_tables_.contains(key.table_id));
intents_.emplace(key.table_id, std::string(key.ybctid));
}

private:
YbctidReaderProvider& reader_provider_;
const BufferingSettings& buffering_settings_;
MemoryOptimizedTableYbctidSet references_;
TableYbctidSet intents_;
OidSet region_local_tables_;
};

PgFKReferenceCache::PgFKReferenceCache(
YbctidReaderProvider& reader_provider,
std::reference_wrapper<const BufferingSettings> buffering_settings)
: impl_(new Impl(reader_provider, buffering_settings)) {}

PgFKReferenceCache::~PgFKReferenceCache() = default;

void PgFKReferenceCache::Clear() {
impl_->Clear();
}

void PgFKReferenceCache::DeleteReference(const LightweightTableYbctid& key) {
impl_->DeleteReference(key);
}

void PgFKReferenceCache::AddReference(const LightweightTableYbctid& key) {
impl_->AddReference(key);
}

Result<bool> PgFKReferenceCache::IsReferenceExists(
PgOid database_id, const LightweightTableYbctid& key) {
return impl_->IsReferenceExists(database_id, key);
}

void PgFKReferenceCache::AddIntent(const LightweightTableYbctid& key, bool is_region_local) {
impl_->AddIntent(key, is_region_local);
}

} // namespace yb::pggate
47 changes: 47 additions & 0 deletions src/yb/yql/pggate/pg_fk_reference_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#pragma once

#include <functional>
#include <memory>

#include "yb/common/pg_types.h"

#include "yb/util/result.h"

namespace yb::pggate {

struct LightweightTableYbctid;
class YbctidReaderProvider;
struct BufferingSettings;

class PgFKReferenceCache {
public:
PgFKReferenceCache(YbctidReaderProvider& reader_provider,
std::reference_wrapper<const BufferingSettings> buffering_settings);
~PgFKReferenceCache();

void Clear();
void DeleteReference(const LightweightTableYbctid& key);
void AddReference(const LightweightTableYbctid& key);
Result<bool> IsReferenceExists(PgOid database_id, const LightweightTableYbctid& key);
void AddIntent(const LightweightTableYbctid& key, bool is_region_local);

private:
class Impl;

std::unique_ptr<Impl> impl_;
};

} // namespace yb::pggate
6 changes: 0 additions & 6 deletions src/yb/yql/pggate/pg_operation_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ class PgDocMetrics;
class PgSession;
class PgTableDesc;

struct BufferingSettings {
size_t max_batch_size;
size_t max_in_flight_operations;
int multiple;
};

class BufferableOperations {
public:
const PgsqlOps& operations() const { return operations_; }
Expand Down
Loading

0 comments on commit 9fc2912

Please sign in to comment.