From 49d9673e7e48d8949ab759b2657abd01fea4e8f0 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Mon, 13 Jan 2025 19:45:51 +0800 Subject: [PATCH] enhance: Unify optimize and remove ready_to_load PyMilvus used to be the only client that uses ready_to_load. Not it'll load the collection when creating it, so this PR removes `ready_to_load` from the client.API Also this PR enhance optimize and remove the optimize_with_size Signed-off-by: yangxuan --- .../aliyun_opensearch/aliyun_opensearch.py | 8 +------ .../backend/clients/alloydb/alloydb.py | 5 +--- vectordb_bench/backend/clients/api.py | 23 +++++++------------ .../clients/aws_opensearch/aws_opensearch.py | 11 ++++----- .../backend/clients/chroma/chroma.py | 5 +--- .../clients/elastic_cloud/elastic_cloud.py | 5 +--- .../backend/clients/memorydb/memorydb.py | 7 ++---- .../backend/clients/milvus/milvus.py | 21 +---------------- .../backend/clients/pgdiskann/pgdiskann.py | 5 +--- .../backend/clients/pgvecto_rs/pgvecto_rs.py | 5 +--- .../backend/clients/pgvector/pgvector.py | 5 +--- .../clients/pgvectorscale/pgvectorscale.py | 5 +--- .../backend/clients/pinecone/pinecone.py | 5 +--- .../clients/qdrant_cloud/qdrant_cloud.py | 5 +--- vectordb_bench/backend/clients/redis/redis.py | 5 +--- vectordb_bench/backend/clients/test/test.py | 5 +--- .../clients/weaviate_cloud/weaviate_cloud.py | 5 +--- .../backend/runner/read_write_runner.py | 2 +- .../backend/runner/serial_runner.py | 4 +--- vectordb_bench/backend/task_runner.py | 6 ++--- 20 files changed, 33 insertions(+), 109 deletions(-) diff --git a/vectordb_bench/backend/clients/aliyun_opensearch/aliyun_opensearch.py b/vectordb_bench/backend/clients/aliyun_opensearch/aliyun_opensearch.py index 00227cff..32487193 100644 --- a/vectordb_bench/backend/clients/aliyun_opensearch/aliyun_opensearch.py +++ b/vectordb_bench/backend/clients/aliyun_opensearch/aliyun_opensearch.py @@ -325,10 +325,7 @@ def need_normalize_cosine(self) -> bool: return False - def optimize(self): - pass - - def optimize_with_size(self, data_size: int): + def optimize(self, data_size: int): log.info(f"optimize count: {data_size}") retry_times = 0 while True: @@ -340,6 +337,3 @@ def optimize_with_size(self, data_size: int): if total_count == data_size: log.info("optimize table finish.") return - - def ready_to_load(self): - """ready_to_load will be called before load in load cases.""" diff --git a/vectordb_bench/backend/clients/alloydb/alloydb.py b/vectordb_bench/backend/clients/alloydb/alloydb.py index c81f7767..b9808ce5 100644 --- a/vectordb_bench/backend/clients/alloydb/alloydb.py +++ b/vectordb_bench/backend/clients/alloydb/alloydb.py @@ -149,10 +149,7 @@ def _drop_table(self): ) self.conn.commit() - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): self._post_insert() def _post_insert(self): diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py index aa93abc1..a86849e9 100644 --- a/vectordb_bench/backend/clients/api.py +++ b/vectordb_bench/backend/clients/api.py @@ -137,6 +137,13 @@ def __init__( @contextmanager def init(self) -> None: """create and destory connections to database. + Why contextmanager: + + In multiprocessing search tasks, vectordbbench might init + totally hundreds of thousands of connections with DB server. + + Too many connections may drain local FDs or server connection resources. + If the DB client doesn't have `close()` method, just set the object to None. Examples: >>> with self.init(): @@ -187,9 +194,8 @@ def search_embedding( """ raise NotImplementedError - # TODO: remove @abstractmethod - def optimize(self): + def optimize(self, data_size: int | None = None): """optimize will be called between insertion and search in performance cases. Should be blocked until the vectorDB is ready to be tested on @@ -199,16 +205,3 @@ def optimize(self): Optimize's execution time is limited, the limited time is based on cases. """ raise NotImplementedError - - def optimize_with_size(self, data_size: int): - self.optimize() - - # TODO: remove - @abstractmethod - def ready_to_load(self): - """ready_to_load will be called before load in load cases. - - Should be blocked until the vectorDB is ready to be tested on - heavy load cases. - """ - raise NotImplementedError diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 487ec67c..234014f1 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -145,15 +145,15 @@ def search_embedding( docvalue_fields=[self.id_col_name], stored_fields="_none_", ) - log.info(f'Search took: {resp["took"]}') - log.info(f'Search shards: {resp["_shards"]}') - log.info(f'Search hits total: {resp["hits"]["total"]}') + log.info(f"Search took: {resp['took']}") + log.info(f"Search shards: {resp['_shards']}") + log.info(f"Search hits total: {resp['hits']['total']}") return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]] except Exception as e: log.warning(f"Failed to search: {self.index_name} error: {e!s}") raise e from None - def optimize(self): + def optimize(self, data_size: int | None = None): """optimize will be called between insertion and search in performance cases.""" # Call refresh first to ensure that all segments are created self._refresh_index() @@ -194,6 +194,3 @@ def _load_graphs_to_memory(self): log.info("Calling warmup API to load graphs into memory") warmup_endpoint = f"/_plugins/_knn/warmup/{self.index_name}" self.client.transport.perform_request("GET", warmup_endpoint) - - def ready_to_load(self): - """ready_to_load will be called before load in load cases.""" diff --git a/vectordb_bench/backend/clients/chroma/chroma.py b/vectordb_bench/backend/clients/chroma/chroma.py index a148fa14..76c81026 100644 --- a/vectordb_bench/backend/clients/chroma/chroma.py +++ b/vectordb_bench/backend/clients/chroma/chroma.py @@ -57,10 +57,7 @@ def init(self) -> None: def ready_to_search(self) -> bool: pass - def ready_to_load(self) -> bool: - pass - - def optimize(self) -> None: + def optimize(self, data_size: int | None = None): pass def insert_embeddings( diff --git a/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py b/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py index a3183bcb..ea038c58 100644 --- a/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py +++ b/vectordb_bench/backend/clients/elastic_cloud/elastic_cloud.py @@ -143,7 +143,7 @@ def search_embedding( log.warning(f"Failed to search: {self.indice} error: {e!s}") raise e from None - def optimize(self): + def optimize(self, data_size: int | None = None): """optimize will be called between insertion and search in performance cases.""" assert self.client is not None, "should self.init() first" self.client.indices.refresh(index=self.indice) @@ -158,6 +158,3 @@ def optimize(self): task_status = self.client.tasks.get(task_id=force_merge_task_id) if task_status["completed"]: return - - def ready_to_load(self): - """ready_to_load will be called before load in load cases.""" diff --git a/vectordb_bench/backend/clients/memorydb/memorydb.py b/vectordb_bench/backend/clients/memorydb/memorydb.py index d05e30be..9d077f5d 100644 --- a/vectordb_bench/backend/clients/memorydb/memorydb.py +++ b/vectordb_bench/backend/clients/memorydb/memorydb.py @@ -157,17 +157,14 @@ def init(self) -> Generator[None, None, None]: self.conn = self.get_client() search_param = self.case_config.search_param() if search_param["ef_runtime"]: - self.ef_runtime_str = f'EF_RUNTIME {search_param["ef_runtime"]}' + self.ef_runtime_str = f"EF_RUNTIME {search_param['ef_runtime']}" else: self.ef_runtime_str = "" yield self.conn.close() self.conn = None - def ready_to_load(self) -> bool: - pass - - def optimize(self) -> None: + def optimize(self, data_size: int | None = None): self._post_insert() def insert_embeddings( diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py index 45fe7269..4015eb1f 100644 --- a/vectordb_bench/backend/clients/milvus/milvus.py +++ b/vectordb_bench/backend/clients/milvus/milvus.py @@ -138,26 +138,7 @@ def wait_index(): log.warning(f"{self.name} optimize error: {e}") raise e from None - def ready_to_load(self): - assert self.col, "Please call self.init() before" - self._pre_load(self.col) - - def _pre_load(self, coll: Collection): - try: - if not coll.has_index(index_name=self._index_name): - log.info(f"{self.name} create index") - coll.create_index( - self._vector_field, - self.case_config.index_param(), - index_name=self._index_name, - ) - coll.load() - log.info(f"{self.name} load") - except Exception as e: - log.warning(f"{self.name} pre load error: {e}") - raise e from None - - def optimize(self): + def optimize(self, data_size: int | None = None): assert self.col, "Please call self.init() before" self._optimize() diff --git a/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py b/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py index c2197290..8bede0f0 100644 --- a/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py +++ b/vectordb_bench/backend/clients/pgdiskann/pgdiskann.py @@ -143,10 +143,7 @@ def _drop_table(self): ) self.conn.commit() - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): self._post_insert() def _post_insert(self): diff --git a/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py b/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py index 64e95a1b..3006b861 100644 --- a/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py +++ b/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py @@ -153,10 +153,7 @@ def _drop_table(self): ) self.conn.commit() - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): self._post_insert() def _post_insert(self): diff --git a/vectordb_bench/backend/clients/pgvector/pgvector.py b/vectordb_bench/backend/clients/pgvector/pgvector.py index bd024175..4164461f 100644 --- a/vectordb_bench/backend/clients/pgvector/pgvector.py +++ b/vectordb_bench/backend/clients/pgvector/pgvector.py @@ -228,10 +228,7 @@ def _drop_table(self): ) self.conn.commit() - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): self._post_insert() def _post_insert(self): diff --git a/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py b/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py index ca7d809b..3985c071 100644 --- a/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py +++ b/vectordb_bench/backend/clients/pgvectorscale/pgvectorscale.py @@ -143,10 +143,7 @@ def _drop_table(self): ) self.conn.commit() - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): self._post_insert() def _post_insert(self): diff --git a/vectordb_bench/backend/clients/pinecone/pinecone.py b/vectordb_bench/backend/clients/pinecone/pinecone.py index c59ee876..1a681b33 100644 --- a/vectordb_bench/backend/clients/pinecone/pinecone.py +++ b/vectordb_bench/backend/clients/pinecone/pinecone.py @@ -59,10 +59,7 @@ def init(self): self.index = pc.Index(self.index_name) yield - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): pass def insert_embeddings( diff --git a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py index a0d146a7..5de72798 100644 --- a/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py +++ b/vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py @@ -62,10 +62,7 @@ def init(self) -> None: self.qdrant_client = None del self.qdrant_client - def ready_to_load(self): - pass - - def optimize(self): + def optimize(self, data_size: int | None = None): assert self.qdrant_client, "Please call self.init() before" # wait for vectors to be fully indexed try: diff --git a/vectordb_bench/backend/clients/redis/redis.py b/vectordb_bench/backend/clients/redis/redis.py index 139850d2..ef0aad9a 100644 --- a/vectordb_bench/backend/clients/redis/redis.py +++ b/vectordb_bench/backend/clients/redis/redis.py @@ -95,10 +95,7 @@ def init(self) -> None: def ready_to_search(self) -> bool: """Check if the database is ready to search.""" - def ready_to_load(self) -> bool: - pass - - def optimize(self) -> None: + def optimize(self, data_size: int | None = None): pass def insert_embeddings( diff --git a/vectordb_bench/backend/clients/test/test.py b/vectordb_bench/backend/clients/test/test.py index ee5a523f..d2bcb74b 100644 --- a/vectordb_bench/backend/clients/test/test.py +++ b/vectordb_bench/backend/clients/test/test.py @@ -33,10 +33,7 @@ def init(self) -> Generator[None, None, None]: yield - def ready_to_load(self) -> bool: - return True - - def optimize(self) -> None: + def optimize(self, data_size: int | None = None): pass def insert_embeddings( diff --git a/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py b/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py index b42f70af..aa4368bb 100644 --- a/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py +++ b/vectordb_bench/backend/clients/weaviate_cloud/weaviate_cloud.py @@ -67,10 +67,7 @@ def init(self) -> None: self.client = None del self.client - def ready_to_load(self): - """Should call insert first, do nothing""" - - def optimize(self): + def optimize(self, data_size: int | None = None): assert self.client.schema.exists(self.collection_name) self.client.schema.update_config( self.collection_name, diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py index d7584459..eaba51f5 100644 --- a/vectordb_bench/backend/runner/read_write_runner.py +++ b/vectordb_bench/backend/runner/read_write_runner.py @@ -80,7 +80,7 @@ def run_optimize(self): """Optimize needs to run in differenct process for pymilvus schema recursion problem""" with self.db.init(): log.info("Search after write - Optimize start") - self.db.optimize() + self.db.optimize(data_size=self.data_volume) log.info("Search after write - Optimize finished") def run_search(self): diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index 08d42e14..36564113 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -68,7 +68,7 @@ def task(self) -> int: log.info( f"({mp.current_process().name:16}) Finish loading all dataset into VectorDB, " - f"dur={time.perf_counter()-start}" + f"dur={time.perf_counter() - start}" ) return count @@ -156,8 +156,6 @@ def run_endlessness(self) -> int: start_time = time.perf_counter() max_load_count, times = 0, 0 try: - with self.db.init(): - self.db.ready_to_load() while time.perf_counter() - start_time < self.timeout: count = self.endless_insert_data( all_embeddings, diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index e8be9f07..2a583b4f 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -234,13 +234,13 @@ def _conc_search(self): self.stop() @utils.time_it - def _task(self) -> None: + def _optimize_task(self) -> None: with self.db.init(): - self.db.optimize_with_size(data_size=self.ca.dataset.data.size) + self.db.optimize(data_size=self.ca.dataset.data.size) def _optimize(self) -> float: with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: - future = executor.submit(self._task) + future = executor.submit(self._optimize_task) try: return future.result(timeout=self.ca.optimize_timeout)[1] except TimeoutError as e: