Skip to content

Commit

Permalink
Fix needs_flush assertion in file ingestion (#13045)
Browse files Browse the repository at this point in the history
Summary:
This PR makes file ingestion job's flush wait a bit further until the SuperVersion is also updated. This is necessary since follow up operations will use the current SuperVersion to do range overlapping check and level assignment.

In debug mode, file ingestion job's second `NeedsFlush` call could have been invoked when the memtables are flushed but the SuperVersion hasn't been updated yet, triggering the assertion.

Pull Request resolved: #13045

Test Plan:
Existing tests
Manually stress tested

Reviewed By: cbi42

Differential Revision: D63671151

Pulled By: jowlyzhang

fbshipit-source-id: 95a169e58a7e59f6dd4125e7296e9060fe4c63a7
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Oct 3, 2024
1 parent dd23e84 commit 9375c3b
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 16 deletions.
6 changes: 3 additions & 3 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,9 @@ class ColumnFamilyData {
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
uint64_t GetLiveSstFilesSize() const; // REQUIRE: DB mutex held
uint64_t GetTotalBlobFileSize() const; // REQUIRE: DB mutex held
// REQUIRE: DB mutex held
void SetMemtable(MemTable* new_mem) {
uint64_t memtable_id = last_memtable_id_.fetch_add(1) + 1;
new_mem->SetID(memtable_id);
new_mem->SetID(++last_memtable_id_);
mem_ = new_mem;
}

Expand Down Expand Up @@ -669,7 +669,7 @@ class ColumnFamilyData {
bool allow_2pc_;

// Memtable id to track flush.
std::atomic<uint64_t> last_memtable_id_;
uint64_t last_memtable_id_;

// Directories corresponding to cf_paths.
std::vector<std::shared_ptr<FSDirectory>> data_dirs_;
Expand Down
11 changes: 6 additions & 5 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2076,17 +2076,18 @@ class DBImpl : public DB {
// memtable pending flush.
// resuming_from_bg_err indicates whether the caller is attempting to resume
// from background error.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id = nullptr,
bool resuming_from_bg_err = false) {
Status WaitForFlushMemTable(
ColumnFamilyData* cfd, const uint64_t* flush_memtable_id = nullptr,
bool resuming_from_bg_err = false,
std::optional<FlushReason> flush_reason = std::nullopt) {
return WaitForFlushMemTables({cfd}, {flush_memtable_id},
resuming_from_bg_err);
resuming_from_bg_err, flush_reason);
}
// Wait for memtables to be flushed for multiple column families.
Status WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err);
bool resuming_from_bg_err, std::optional<FlushReason> flush_reason);

inline void WaitForPendingWrites() {
mutex_.AssertHeld();
Expand Down
20 changes: 15 additions & 5 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2407,7 +2407,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */);
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */,
flush_reason);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* tmp_cfd : cfds) {
tmp_cfd->UnrefAndTryDelete();
Expand Down Expand Up @@ -2549,7 +2550,8 @@ Status DBImpl::AtomicFlushMemTables(
}
s = WaitForFlushMemTables(
cfds, flush_memtable_ids,
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */);
flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */,
flush_reason);
InstrumentedMutexLock lock_guard(&mutex_);
for (auto* cfd : cfds) {
cfd->UnrefAndTryDelete();
Expand Down Expand Up @@ -2612,7 +2614,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason,
flush_memtable_id_ptrs.push_back(&flush_memtable_id);
}
s = WaitForFlushMemTables(cfds, flush_memtable_id_ptrs,
true /* resuming_from_bg_err */);
true /* resuming_from_bg_err */, flush_reason);
mutex_.Lock();
}

Expand Down Expand Up @@ -2712,7 +2714,7 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
Status DBImpl::WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err) {
bool resuming_from_bg_err, std::optional<FlushReason> flush_reason) {
int num = static_cast<int>(cfds.size());
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
Expand Down Expand Up @@ -2750,7 +2752,15 @@ Status DBImpl::WaitForFlushMemTables(
(flush_memtable_ids[i] != nullptr &&
cfds[i]->imm()->GetEarliestMemTableID() >
*flush_memtable_ids[i])) {
++num_finished;
// Make file ingestion's flush wait until SuperVersion is also updated
// since after flush, it does range overlapping check and file level
// assignment with the current SuperVersion.
if (!flush_reason.has_value() ||
flush_reason.value() != FlushReason::kExternalFileIngestion ||
cfds[i]->GetSuperVersion()->imm->GetID() ==
cfds[i]->imm()->current()->GetID()) {
++num_finished;
}
}
}
if (1 == num_dropped && 1 == num) {
Expand Down
15 changes: 14 additions & 1 deletion db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,15 @@ bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
}
}

bool MemTableListVersion::HistoryShouldBeTrimmed(size_t usage) {
return MemtableLimitExceeded(usage) && !memlist_history_.empty();
}

// Make sure we don't use up too much space in history
bool MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
size_t usage) {
bool ret = false;
while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
while (HistoryShouldBeTrimmed(usage)) {
MemTable* x = memlist_history_.back();
memlist_history_.pop_back();

Expand Down Expand Up @@ -661,8 +665,16 @@ void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
}

bool MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
// Check if history trim is needed first, so that we can avoid installing a
// new MemTableListVersion without installing a SuperVersion (installed based
// on return value of this function).
if (!current_->HistoryShouldBeTrimmed(usage)) {
ResetTrimHistoryNeeded();
return false;
}
InstallNewVersion();
bool ret = current_->TrimHistory(to_delete, usage);
assert(ret);
UpdateCachedValuesFromMemTableListVersion();
ResetTrimHistoryNeeded();
return ret;
Expand Down Expand Up @@ -714,6 +726,7 @@ void MemTableList::InstallNewVersion() {
// somebody else holds the current version, we need to create new one
MemTableListVersion* version = current_;
current_ = new MemTableListVersion(&current_memory_usage_, *version);
current_->SetID(++last_memtable_list_version_id_);
current_->Ref();
version->Unref();
}
Expand Down
21 changes: 19 additions & 2 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ class MemTableListVersion {
// Return kMaxSequenceNumber if the list is empty.
SequenceNumber GetFirstSequenceNumber() const;

// REQUIRES: db_mutex held.
void SetID(uint64_t id) { id_ = id; }

uint64_t GetID() const { return id_; }

private:
friend class MemTableList;

Expand All @@ -161,7 +166,11 @@ class MemTableListVersion {
// REQUIRE: m is an immutable memtable
void Remove(MemTable* m, autovector<MemTable*>* to_delete);

// Return true if memtable is trimmed
// Return true if the memtable list should be trimmed to get memory usage
// under budget.
bool HistoryShouldBeTrimmed(size_t usage);

// Trim history, Return true if memtable is trimmed
bool TrimHistory(autovector<MemTable*>* to_delete, size_t usage);

bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
Expand Down Expand Up @@ -205,6 +214,9 @@ class MemTableListVersion {
int refs_ = 0;

size_t* parent_memtable_list_memory_usage_;

// MemtableListVersion id to track for flush results checking.
uint64_t id_ = 0;
};

// This class stores references to all the immutable memtables.
Expand Down Expand Up @@ -235,7 +247,8 @@ class MemTableList {
flush_requested_(false),
current_memory_usage_(0),
current_memory_allocted_bytes_excluding_last_(0),
current_has_history_(false) {
current_has_history_(false),
last_memtable_list_version_id_(0) {
current_->Ref();
}

Expand Down Expand Up @@ -500,6 +513,10 @@ class MemTableList {

// Cached value of current_->HasHistory().
std::atomic<bool> current_has_history_;

// Last memtabe list version id, increase by 1 each time a new
// MemtableListVersion is installed.
uint64_t last_memtable_list_version_id_;
};

// Installs memtable atomic flush results.
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,7 @@ struct FlushOptions {
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;

FlushOptions() : wait(true), allow_write_stall(false) {}
};

Expand Down

0 comments on commit 9375c3b

Please sign in to comment.