Skip to content

Commit

Permalink
enhance: Unify optimize and remove ready_to_load
Browse files Browse the repository at this point in the history
PyMilvus used to be the only client that uses ready_to_load.
Not it'll load the collection when creating it, so
this PR removes `ready_to_load` from the client.API

Also this PR enhance optimize and remove the optimize_with_size

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Jan 13, 2025
1 parent dd5b162 commit 49d9673
Show file tree
Hide file tree
Showing 20 changed files with 33 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,7 @@ def need_normalize_cosine(self) -> bool:

return False

def optimize(self):
pass

def optimize_with_size(self, data_size: int):
def optimize(self, data_size: int):
log.info(f"optimize count: {data_size}")
retry_times = 0
while True:
Expand All @@ -340,6 +337,3 @@ def optimize_with_size(self, data_size: int):
if total_count == data_size:
log.info("optimize table finish.")
return

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/alloydb/alloydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
23 changes: 8 additions & 15 deletions vectordb_bench/backend/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ def __init__(
@contextmanager
def init(self) -> None:
"""create and destory connections to database.
Why contextmanager:
In multiprocessing search tasks, vectordbbench might init
totally hundreds of thousands of connections with DB server.
Too many connections may drain local FDs or server connection resources.
If the DB client doesn't have `close()` method, just set the object to None.
Examples:
>>> with self.init():
Expand Down Expand Up @@ -187,9 +194,8 @@ def search_embedding(
"""
raise NotImplementedError

# TODO: remove
@abstractmethod
def optimize(self):
def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases.
Should be blocked until the vectorDB is ready to be tested on
Expand All @@ -199,16 +205,3 @@ def optimize(self):
Optimize's execution time is limited, the limited time is based on cases.
"""
raise NotImplementedError

def optimize_with_size(self, data_size: int):
self.optimize()

# TODO: remove
@abstractmethod
def ready_to_load(self):
"""ready_to_load will be called before load in load cases.
Should be blocked until the vectorDB is ready to be tested on
heavy load cases.
"""
raise NotImplementedError
11 changes: 4 additions & 7 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ def search_embedding(
docvalue_fields=[self.id_col_name],
stored_fields="_none_",
)
log.info(f'Search took: {resp["took"]}')
log.info(f'Search shards: {resp["_shards"]}')
log.info(f'Search hits total: {resp["hits"]["total"]}')
log.info(f"Search took: {resp['took']}")
log.info(f"Search shards: {resp['_shards']}")
log.info(f"Search hits total: {resp['hits']['total']}")
return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]]
except Exception as e:
log.warning(f"Failed to search: {self.index_name} error: {e!s}")
raise e from None

def optimize(self):
def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases."""
# Call refresh first to ensure that all segments are created
self._refresh_index()
Expand Down Expand Up @@ -194,6 +194,3 @@ def _load_graphs_to_memory(self):
log.info("Calling warmup API to load graphs into memory")
warmup_endpoint = f"/_plugins/_knn/warmup/{self.index_name}"
self.client.transport.perform_request("GET", warmup_endpoint)

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/chroma/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ def init(self) -> None:
def ready_to_search(self) -> bool:
pass

def ready_to_load(self) -> bool:
pass

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def search_embedding(
log.warning(f"Failed to search: {self.indice} error: {e!s}")
raise e from None

def optimize(self):
def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases."""
assert self.client is not None, "should self.init() first"
self.client.indices.refresh(index=self.indice)
Expand All @@ -158,6 +158,3 @@ def optimize(self):
task_status = self.client.tasks.get(task_id=force_merge_task_id)
if task_status["completed"]:
return

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
7 changes: 2 additions & 5 deletions vectordb_bench/backend/clients/memorydb/memorydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,14 @@ def init(self) -> Generator[None, None, None]:
self.conn = self.get_client()
search_param = self.case_config.search_param()
if search_param["ef_runtime"]:
self.ef_runtime_str = f'EF_RUNTIME {search_param["ef_runtime"]}'
self.ef_runtime_str = f"EF_RUNTIME {search_param['ef_runtime']}"
else:
self.ef_runtime_str = ""
yield
self.conn.close()
self.conn = None

def ready_to_load(self) -> bool:
pass

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
self._post_insert()

def insert_embeddings(
Expand Down
21 changes: 1 addition & 20 deletions vectordb_bench/backend/clients/milvus/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,7 @@ def wait_index():
log.warning(f"{self.name} optimize error: {e}")
raise e from None

def ready_to_load(self):
assert self.col, "Please call self.init() before"
self._pre_load(self.col)

def _pre_load(self, coll: Collection):
try:
if not coll.has_index(index_name=self._index_name):
log.info(f"{self.name} create index")
coll.create_index(
self._vector_field,
self.case_config.index_param(),
index_name=self._index_name,
)
coll.load()
log.info(f"{self.name} load")
except Exception as e:
log.warning(f"{self.name} pre load error: {e}")
raise e from None

def optimize(self):
def optimize(self, data_size: int | None = None):
assert self.col, "Please call self.init() before"
self._optimize()

Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pgdiskann/pgdiskann.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pgvector/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pinecone/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ def init(self):
self.index = pc.Index(self.index_name)
yield

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ def init(self) -> None:
self.qdrant_client = None
del self.qdrant_client

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
assert self.qdrant_client, "Please call self.init() before"
# wait for vectors to be fully indexed
try:
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/redis/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ def init(self) -> None:
def ready_to_search(self) -> bool:
"""Check if the database is ready to search."""

def ready_to_load(self) -> bool:
pass

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ def init(self) -> Generator[None, None, None]:

yield

def ready_to_load(self) -> bool:
return True

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ def init(self) -> None:
self.client = None
del self.client

def ready_to_load(self):
"""Should call insert first, do nothing"""

def optimize(self):
def optimize(self, data_size: int | None = None):
assert self.client.schema.exists(self.collection_name)
self.client.schema.update_config(
self.collection_name,
Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/backend/runner/read_write_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run_optimize(self):
"""Optimize needs to run in differenct process for pymilvus schema recursion problem"""
with self.db.init():
log.info("Search after write - Optimize start")
self.db.optimize()
self.db.optimize(data_size=self.data_volume)
log.info("Search after write - Optimize finished")

def run_search(self):
Expand Down
4 changes: 1 addition & 3 deletions vectordb_bench/backend/runner/serial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def task(self) -> int:

log.info(
f"({mp.current_process().name:16}) Finish loading all dataset into VectorDB, "
f"dur={time.perf_counter()-start}"
f"dur={time.perf_counter() - start}"
)
return count

Expand Down Expand Up @@ -156,8 +156,6 @@ def run_endlessness(self) -> int:
start_time = time.perf_counter()
max_load_count, times = 0, 0
try:
with self.db.init():
self.db.ready_to_load()
while time.perf_counter() - start_time < self.timeout:
count = self.endless_insert_data(
all_embeddings,
Expand Down
6 changes: 3 additions & 3 deletions vectordb_bench/backend/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ def _conc_search(self):
self.stop()

@utils.time_it
def _task(self) -> None:
def _optimize_task(self) -> None:
with self.db.init():
self.db.optimize_with_size(data_size=self.ca.dataset.data.size)
self.db.optimize(data_size=self.ca.dataset.data.size)

def _optimize(self) -> float:
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(self._task)
future = executor.submit(self._optimize_task)
try:
return future.result(timeout=self.ca.optimize_timeout)[1]
except TimeoutError as e:
Expand Down

0 comments on commit 49d9673

Please sign in to comment.