Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Unify optimize and remove ready_to_load #448

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,7 @@ def need_normalize_cosine(self) -> bool:

return False

def optimize(self):
pass

def optimize_with_size(self, data_size: int):
def optimize(self, data_size: int):
log.info(f"optimize count: {data_size}")
retry_times = 0
while True:
Expand All @@ -340,6 +337,3 @@ def optimize_with_size(self, data_size: int):
if total_count == data_size:
log.info("optimize table finish.")
return

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/alloydb/alloydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
23 changes: 8 additions & 15 deletions vectordb_bench/backend/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ def __init__(
@contextmanager
def init(self) -> None:
"""create and destory connections to database.
Why contextmanager:

In multiprocessing search tasks, vectordbbench might init
totally hundreds of thousands of connections with DB server.

Too many connections may drain local FDs or server connection resources.
If the DB client doesn't have `close()` method, just set the object to None.

Examples:
>>> with self.init():
Expand Down Expand Up @@ -187,9 +194,8 @@ def search_embedding(
"""
raise NotImplementedError

# TODO: remove
@abstractmethod
def optimize(self):
def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases.

Should be blocked until the vectorDB is ready to be tested on
Expand All @@ -199,16 +205,3 @@ def optimize(self):
Optimize's execution time is limited, the limited time is based on cases.
"""
raise NotImplementedError

def optimize_with_size(self, data_size: int):
self.optimize()

# TODO: remove
@abstractmethod
def ready_to_load(self):
"""ready_to_load will be called before load in load cases.

Should be blocked until the vectorDB is ready to be tested on
heavy load cases.
"""
raise NotImplementedError
11 changes: 4 additions & 7 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ def search_embedding(
docvalue_fields=[self.id_col_name],
stored_fields="_none_",
)
log.info(f'Search took: {resp["took"]}')
log.info(f'Search shards: {resp["_shards"]}')
log.info(f'Search hits total: {resp["hits"]["total"]}')
log.info(f"Search took: {resp['took']}")
log.info(f"Search shards: {resp['_shards']}")
log.info(f"Search hits total: {resp['hits']['total']}")
return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]]
except Exception as e:
log.warning(f"Failed to search: {self.index_name} error: {e!s}")
raise e from None

def optimize(self):
def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases."""
# Call refresh first to ensure that all segments are created
self._refresh_index()
Expand Down Expand Up @@ -194,6 +194,3 @@ def _load_graphs_to_memory(self):
log.info("Calling warmup API to load graphs into memory")
warmup_endpoint = f"/_plugins/_knn/warmup/{self.index_name}"
self.client.transport.perform_request("GET", warmup_endpoint)

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/chroma/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ def init(self) -> None:
def ready_to_search(self) -> bool:
pass

def ready_to_load(self) -> bool:
pass

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def search_embedding(
log.warning(f"Failed to search: {self.indice} error: {e!s}")
raise e from None

def optimize(self):
def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases."""
assert self.client is not None, "should self.init() first"
self.client.indices.refresh(index=self.indice)
Expand All @@ -158,6 +158,3 @@ def optimize(self):
task_status = self.client.tasks.get(task_id=force_merge_task_id)
if task_status["completed"]:
return

def ready_to_load(self):
"""ready_to_load will be called before load in load cases."""
7 changes: 2 additions & 5 deletions vectordb_bench/backend/clients/memorydb/memorydb.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,14 @@ def init(self) -> Generator[None, None, None]:
self.conn = self.get_client()
search_param = self.case_config.search_param()
if search_param["ef_runtime"]:
self.ef_runtime_str = f'EF_RUNTIME {search_param["ef_runtime"]}'
self.ef_runtime_str = f"EF_RUNTIME {search_param['ef_runtime']}"
else:
self.ef_runtime_str = ""
yield
self.conn.close()
self.conn = None

def ready_to_load(self) -> bool:
pass

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
self._post_insert()

def insert_embeddings(
Expand Down
21 changes: 1 addition & 20 deletions vectordb_bench/backend/clients/milvus/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,7 @@ def wait_index():
log.warning(f"{self.name} optimize error: {e}")
raise e from None

def ready_to_load(self):
assert self.col, "Please call self.init() before"
self._pre_load(self.col)

def _pre_load(self, coll: Collection):
try:
if not coll.has_index(index_name=self._index_name):
log.info(f"{self.name} create index")
coll.create_index(
self._vector_field,
self.case_config.index_param(),
index_name=self._index_name,
)
coll.load()
log.info(f"{self.name} load")
except Exception as e:
log.warning(f"{self.name} pre load error: {e}")
raise e from None

def optimize(self):
def optimize(self, data_size: int | None = None):
assert self.col, "Please call self.init() before"
self._optimize()

Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pgdiskann/pgdiskann.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pgvector/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,7 @@ def _drop_table(self):
)
self.conn.commit()

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
self._post_insert()

def _post_insert(self):
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/pinecone/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ def init(self):
self.index = pc.Index(self.index_name)
yield

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ def init(self) -> None:
self.qdrant_client = None
del self.qdrant_client

def ready_to_load(self):
pass

def optimize(self):
def optimize(self, data_size: int | None = None):
assert self.qdrant_client, "Please call self.init() before"
# wait for vectors to be fully indexed
try:
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/redis/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ def init(self) -> None:
def ready_to_search(self) -> bool:
"""Check if the database is ready to search."""

def ready_to_load(self) -> bool:
pass

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
5 changes: 1 addition & 4 deletions vectordb_bench/backend/clients/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ def init(self) -> Generator[None, None, None]:

yield

def ready_to_load(self) -> bool:
return True

def optimize(self) -> None:
def optimize(self, data_size: int | None = None):
pass

def insert_embeddings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ def init(self) -> None:
self.client = None
del self.client

def ready_to_load(self):
"""Should call insert first, do nothing"""

def optimize(self):
def optimize(self, data_size: int | None = None):
assert self.client.schema.exists(self.collection_name)
self.client.schema.update_config(
self.collection_name,
Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/backend/runner/read_write_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run_optimize(self):
"""Optimize needs to run in differenct process for pymilvus schema recursion problem"""
with self.db.init():
log.info("Search after write - Optimize start")
self.db.optimize()
self.db.optimize(data_size=self.data_volume)
log.info("Search after write - Optimize finished")

def run_search(self):
Expand Down
4 changes: 1 addition & 3 deletions vectordb_bench/backend/runner/serial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def task(self) -> int:

log.info(
f"({mp.current_process().name:16}) Finish loading all dataset into VectorDB, "
f"dur={time.perf_counter()-start}"
f"dur={time.perf_counter() - start}"
)
return count

Expand Down Expand Up @@ -156,8 +156,6 @@ def run_endlessness(self) -> int:
start_time = time.perf_counter()
max_load_count, times = 0, 0
try:
with self.db.init():
self.db.ready_to_load()
while time.perf_counter() - start_time < self.timeout:
count = self.endless_insert_data(
all_embeddings,
Expand Down
6 changes: 3 additions & 3 deletions vectordb_bench/backend/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,13 @@ def _conc_search(self):
self.stop()

@utils.time_it
def _task(self) -> None:
def _optimize_task(self) -> None:
with self.db.init():
self.db.optimize_with_size(data_size=self.ca.dataset.data.size)
self.db.optimize(data_size=self.ca.dataset.data.size)

def _optimize(self) -> float:
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(self._task)
future = executor.submit(self._optimize_task)
try:
return future.result(timeout=self.ca.optimize_timeout)[1]
except TimeoutError as e:
Expand Down
Loading