Skip to content

Commit

Permalink
Merge pull request #22 from Maxxen/perf
Browse files Browse the repository at this point in the history
add progressbar support for index creation
  • Loading branch information
Maxxen authored Jun 26, 2024
2 parents 8e3a622 + 72a5ee8 commit 02116cc
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 1639 files
26 changes: 26 additions & 0 deletions src/hnsw/hnsw_index_physical_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class CreateHNSWIndexGlobalState final : public GlobalSinkState {

// Parallel scan state
ColumnDataParallelScanState scan_state;

// Track which phase we're in
atomic<bool> is_building = {false};
atomic<idx_t> loaded_count = {0};
atomic<idx_t> built_count = {0};
};

unique_ptr<GlobalSinkState> PhysicalCreateHNSWIndex::GetGlobalSinkState(ClientContext &context) const {
Expand Down Expand Up @@ -90,7 +95,9 @@ SinkResultType PhysicalCreateHNSWIndex::Sink(ExecutionContext &context, DataChun
OperatorSinkInput &input) const {

auto &lstate = input.local_state.Cast<CreateHNSWIndexLocalState>();
auto &gstate = input.global_state.Cast<CreateHNSWIndexGlobalState>();
lstate.collection->Append(lstate.append_state, chunk);
gstate.loaded_count += chunk.size();
return SinkResultType::NEED_MORE_INPUT;
}

Expand Down Expand Up @@ -178,6 +185,9 @@ class HNSWIndexConstructTask final : public ExecutorTask {
}
}

// Update the built count
gstate.built_count += count;

if (mode == TaskExecutionMode::PROCESS_PARTIAL) {
// yield!
return TaskExecutionResult::TASK_NOT_FINISHED;
Expand Down Expand Up @@ -273,6 +283,9 @@ SinkFinalizeType PhysicalCreateHNSWIndex::Finalize(Pipeline &pipeline, Event &ev
auto &gstate = input.global_state.Cast<CreateHNSWIndexGlobalState>();
auto &collection = gstate.collection;

// Move on to the next phase
gstate.is_building = true;

// Reserve the index size
auto &index = gstate.global_index->index;
index.reserve(collection->Count());
Expand All @@ -287,4 +300,17 @@ SinkFinalizeType PhysicalCreateHNSWIndex::Finalize(Pipeline &pipeline, Event &ev
return SinkFinalizeType::READY;
}

double PhysicalCreateHNSWIndex::GetSinkProgress(ClientContext &context, GlobalSinkState &gstate,
double source_progress) const {
// The "source_progress" is not relevant for CREATE INDEX statements
const auto &state = gstate.Cast<CreateHNSWIndexGlobalState>();
// First half of the progress is appending to the collection
if (!state.is_building) {
return 50.0 *
MinValue(1.0, static_cast<double>(state.loaded_count) / static_cast<double>(estimated_cardinality));
}
// Second half is actually building the index
return 50.0 + (50.0 * static_cast<double>(state.built_count) / static_cast<double>(state.loaded_count));
}

} // namespace duckdb
2 changes: 2 additions & 0 deletions src/include/hnsw/hnsw_index_physical_create.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class PhysicalCreateHNSWIndex : public PhysicalOperator {
bool ParallelSink() const override {
return true;
}

double GetSinkProgress(ClientContext &context, GlobalSinkState &gstate, double source_progress) const override;
};

} // namespace duckdb

0 comments on commit 02116cc

Please sign in to comment.