Skip to content

Commit

Permalink
Storages: refine VectorIndexCache to LocalIndexCache (#9840)
Browse files Browse the repository at this point in the history
ref #9032

In order to accept more index kinds like FullTextIndex, this PR refine VectorIndexCache to LocalIndexCache.

Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger authored Feb 6, 2025
1 parent fe563a1 commit 1697c72
Show file tree
Hide file tree
Showing 25 changed files with 126 additions and 82 deletions.
20 changes: 10 additions & 10 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/File/ColumnCacheLongTerm.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
Expand Down Expand Up @@ -148,7 +148,7 @@ struct ContextShared
mutable DBGInvoker dbg_invoker; /// Execute inner functions, debug only.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable DM::MinMaxIndexCachePtr minmax_index_cache; /// Cache of minmax index in compressed files.
mutable DM::VectorIndexCachePtr vector_index_cache;
mutable DM::LocalIndexCachePtr local_index_cache;
mutable DM::ColumnCacheLongTermPtr column_cache_long_term;
mutable DM::DeltaIndexManagerPtr delta_index_manager; /// Manage the Delta Indies of Segments.
ProcessList process_list; /// Executing queries at the moment.
Expand Down Expand Up @@ -1370,26 +1370,26 @@ void Context::dropMinMaxIndexCache() const
shared->minmax_index_cache->reset();
}

void Context::setVectorIndexCache(size_t cache_entities)
void Context::setLocalIndexCache(size_t cache_entities)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->vector_index_cache);
RUNTIME_CHECK(!shared->local_index_cache);

shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_entities);
shared->local_index_cache = std::make_shared<DM::LocalIndexCache>(cache_entities);
}

DM::VectorIndexCachePtr Context::getVectorIndexCache() const
DM::LocalIndexCachePtr Context::getLocalIndexCache() const
{
auto lock = getLock();
return shared->vector_index_cache;
return shared->local_index_cache;
}

void Context::dropVectorIndexCache() const
void Context::dropLocalIndexCache() const
{
auto lock = getLock();
if (shared->vector_index_cache)
shared->vector_index_cache.reset();
if (shared->local_index_cache)
shared->local_index_cache.reset();
}

void Context::setColumnCacheLongTerm(size_t cache_size_in_bytes)
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ enum class PageStorageRunMode : UInt8;
namespace DM
{
class MinMaxIndexCache;
class VectorIndexCache;
class LocalIndexCache;
class ColumnCacheLongTerm;
class DeltaIndexManager;
class GlobalStoragePool;
Expand Down Expand Up @@ -399,9 +399,9 @@ class Context
std::shared_ptr<DM::MinMaxIndexCache> getMinMaxIndexCache() const;
void dropMinMaxIndexCache() const;

void setVectorIndexCache(size_t cache_entities);
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;
void setLocalIndexCache(size_t cache_entities);
std::shared_ptr<DM::LocalIndexCache> getLocalIndexCache() const;
void dropLocalIndexCache() const;

void setColumnCacheLongTerm(size_t cache_size_in_bytes);
std::shared_ptr<DM::ColumnCacheLongTerm> getColumnCacheLongTerm() const;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// The vector index cache by number instead of bytes. Because it use `mmap` and let the operator system decide the memory usage.
size_t vec_index_cache_entities = config().getUInt64("vec_index_cache_entities", 1000);
if (vec_index_cache_entities)
global_context->setVectorIndexCache(vec_index_cache_entities);
global_context->setLocalIndexCache(vec_index_cache_entities);

size_t column_cache_long_term_size
= config().getUInt64("column_cache_long_term_size", 512 * 1024 * 1024 /* 512MB */);
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/ColumnDefine_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
#include <TiDB/Schema/TiDB_fwd.h>

#include <memory>
#include <unordered_map>
#include <vector>

namespace DB::DM
{
struct ColumnDefine;
using ColumnDefines = std::vector<ColumnDefine>;
using ColumnDefinesPtr = std::shared_ptr<ColumnDefines>;
using ColumnDefineMap = std::unordered_map<DB::ColumnID, ColumnDefine>;

} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre
const ANNQueryInfoPtr ann_query_info;
const BitmapFilterView valid_rows;
// Global vector index cache
const VectorIndexCachePtr vec_index_cache;
const LocalIndexCachePtr vec_index_cache;
const ColumnDefine vec_cd;
const ColumnDefinesPtr rest_col_defs;

Expand Down Expand Up @@ -69,7 +69,7 @@ class ColumnFileSetWithVectorIndexInputStream : public VectorIndexBlockInputStre
, data_provider(data_provider_)
, ann_query_info(ann_query_info_)
, valid_rows(std::move(valid_rows_))
, vec_index_cache(context_.global_context.getVectorIndexCache())
, vec_index_cache(context_.global_context.getLocalIndexCache())
, vec_cd(std::move(vec_cd_))
, rest_col_defs(rest_col_defs_)
, column_files(reader.snapshot->getColumnFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTinyVectorIndexReader.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache.h>
#include <Storages/DeltaMerge/Index/VectorSearchPerf.h>

namespace DB::DM
Expand Down Expand Up @@ -84,10 +84,11 @@ void ColumnFileTinyVectorIndexReader::loadVectorIndex()
CompressedReadBuffer compressed(read_buf);
return VectorIndexViewer::load(index_info_iter->index_props().vector_index(), compressed);
};
if (vec_index_cache)
if (local_index_cache)
{
const auto key = fmt::format("{}{}", VectorIndexCache::COLUMNFILETINY_INDEX_NAME_PREFIX, index_page_id);
vec_index = vec_index_cache->getOrSet(key, load_from_page_storage);
const auto key = fmt::format("{}{}", LocalIndexCache::COLUMNFILETINY_INDEX_NAME_PREFIX, index_page_id);
auto local_index = local_index_cache->getOrSet(key, load_from_page_storage);
vec_index = std::dynamic_pointer_cast<VectorIndexViewer>(local_index);
}
else
vec_index = load_from_page_storage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include <Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFile.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache_fwd.h>


namespace DB::DM
Expand All @@ -35,8 +35,8 @@ class ColumnFileTinyVectorIndexReader
const BitmapFilterView valid_rows;
// Note: ColumnDefine comes from read path does not have vector_index fields.
const ColumnDefine vec_cd;
// Global vector index cache
const VectorIndexCachePtr vec_index_cache;
// Global local index cache
const LocalIndexCachePtr local_index_cache;
LoggerPtr log;

// Performance statistics
Expand All @@ -62,13 +62,13 @@ class ColumnFileTinyVectorIndexReader
const ANNQueryInfoPtr & ann_query_info_,
const BitmapFilterView && valid_rows_,
const ColumnDefine & vec_cd_,
const VectorIndexCachePtr & vec_index_cache_)
const LocalIndexCachePtr & local_index_cache_)
: tiny_file(tiny_file_)
, data_provider(data_provider_)
, ann_query_info(ann_query_info_)
, valid_rows(std::move(valid_rows_))
, vec_cd(vec_cd_)
, vec_index_cache(vec_index_cache_)
, local_index_cache(local_index_cache_)
, log(Logger::get())
{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#pragma once

#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <Storages/DeltaMerge/Index/VectorIndex_fwd.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>
#include <Storages/DeltaMerge/VectorIndexBlockInputStream.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class DMFile : private boost::noncopyable
*/
ColumnDefines getColumnDefines(bool sort_by_id = true) const
{
ColumnDefines results{};
ColumnDefines results;
results.reserve(this->meta->column_stats.size());
for (const auto & cs : this->meta->column_stats)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & con
setCaches(
global_context.getMarkCache(),
global_context.getMinMaxIndexCache(),
global_context.getVectorIndexCache(),
global_context.getLocalIndexCache(),
global_context.getColumnCacheLongTerm());
// init from settings
setFromSettings(context.getSettingsRef());
Expand Down Expand Up @@ -229,7 +229,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
std::move(rest_columns_reader),
std::move(vec_column.value()),
scan_context,
vector_index_cache,
local_index_cache,
bitmap_filter.value(),
tracing_id);

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h>
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache_fwd.h>
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand Down Expand Up @@ -205,12 +204,12 @@ class DMFileBlockInputStreamBuilder
DMFileBlockInputStreamBuilder & setCaches(
const MarkCachePtr & mark_cache_,
const MinMaxIndexCachePtr & index_cache_,
const VectorIndexCachePtr & vector_index_cache_,
const LocalIndexCachePtr & local_index_cache_,
const ColumnCacheLongTermPtr & column_cache_long_term_)
{
mark_cache = mark_cache_;
index_cache = index_cache_;
vector_index_cache = vector_index_cache_;
local_index_cache = local_index_cache_;
column_cache_long_term = column_cache_long_term_;
return *this;
}
Expand Down Expand Up @@ -243,7 +242,7 @@ class DMFileBlockInputStreamBuilder

ANNQueryInfoPtr ann_query_info = nullptr;

VectorIndexCachePtr vector_index_cache;
LocalIndexCachePtr local_index_cache;
// Note: Currently thie field is assigned only for Stable streams, not available for ColumnFileBig
std::optional<BitmapFilterView> bitmap_filter;

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <Common/TiFlashMetrics.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFileVectorIndexReader.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/Index/LocalIndexCache.h>
#include <Storages/DeltaMerge/Index/VectorSearchPerf.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/S3/FileCache.h>
Expand Down Expand Up @@ -130,13 +130,18 @@ void DMFileVectorIndexReader::loadVectorIndex()
};

Stopwatch watch;
if (vec_index_cache)
if (local_index_cache)
{
// Note: must use local_index_file_path as the cache key, because cache
// will check whether file is still valid and try to remove memory references
// when file is dropped.
vec_index = vec_index_cache->getOrSet(local_index_file_path, load_from_file);
auto local_index = local_index_cache->getOrSet(local_index_file_path, load_from_file);
vec_index = std::dynamic_pointer_cast<VectorIndexViewer>(local_index);
}
else
{
vec_index = load_from_file();
}

perf_stat.duration_load_index += watch.elapsedSeconds();
RUNTIME_CHECK(vec_index != nullptr);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache_fwd.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand All @@ -30,8 +31,8 @@ class DMFileVectorIndexReader
const ANNQueryInfoPtr & ann_query_info;
const BitmapFilterView valid_rows;
const ScanContextPtr & scan_context;
// Global vector index cache
const VectorIndexCachePtr vec_index_cache;
// Global local index cache
const LocalIndexCachePtr local_index_cache;

// Performance statistics
struct PerfStat
Expand Down Expand Up @@ -60,12 +61,12 @@ class DMFileVectorIndexReader
const DMFilePtr & dmfile_,
const BitmapFilterView & valid_rows_,
const ScanContextPtr & scan_context_,
const VectorIndexCachePtr & vec_index_cache_)
const LocalIndexCachePtr & local_index_cache_)
: dmfile(dmfile_)
, ann_query_info(ann_query_info_)
, valid_rows(valid_rows_)
, scan_context(scan_context_)
, vec_index_cache(vec_index_cache_)
, local_index_cache(local_index_cache_)
, perf_stat()
{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DMFileWithVectorIndexBlockInputStream::DMFileWithVectorIndexBlockInputStream(
DMFileReader && reader_,
ColumnDefine && vec_cd_,
const ScanContextPtr & scan_context_,
const VectorIndexCachePtr & vec_index_cache_,
const LocalIndexCachePtr & local_index_cache_,
const BitmapFilterView & valid_rows_,
const String & tracing_id)
: log(Logger::get(tracing_id))
Expand All @@ -41,7 +41,7 @@ DMFileWithVectorIndexBlockInputStream::DMFileWithVectorIndexBlockInputStream(
dmfile,
valid_rows_,
scan_context,
vec_index_cache_))
local_index_cache_))
{}

DMFileWithVectorIndexBlockInputStream::~DMFileWithVectorIndexBlockInputStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/DeltaMerge/File/DMFileVectorIndexReader.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream_fwd.h>
#include <Storages/DeltaMerge/Index/VectorIndex_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/VectorIndexBlockInputStream.h>


Expand Down Expand Up @@ -60,7 +60,7 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream
DMFileReader && reader,
ColumnDefine && vec_cd,
const ScanContextPtr & scan_context,
const VectorIndexCachePtr & vec_index_cache,
const LocalIndexCachePtr & local_index_cache,
const BitmapFilterView & valid_rows,
const String & tracing_id)
{
Expand All @@ -71,7 +71,7 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream
std::move(reader),
std::move(vec_cd),
scan_context,
vec_index_cache,
local_index_cache,
valid_rows,
tracing_id);
}
Expand All @@ -83,7 +83,7 @@ class DMFileWithVectorIndexBlockInputStream : public VectorIndexBlockInputStream
DMFileReader && reader_,
ColumnDefine && vec_cd_,
const ScanContextPtr & scan_context_,
const VectorIndexCachePtr & vec_index_cache_,
const LocalIndexCachePtr & local_index_cache_,
const BitmapFilterView & valid_rows_,
const String & tracing_id);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Filter/RSOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#include <Common/FieldVisitors.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Filter/RSOperator_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndex_fwd.h>
#include <Storages/DeltaMerge/Index/RSIndex.h>
#include <Storages/DeltaMerge/Index/RSResult.h>
#include <Storages/DeltaMerge/Index/VectorIndex_fwd.h>

namespace DB
{
Expand Down
Loading

0 comments on commit 1697c72

Please sign in to comment.