From 300739becce6c965db35739838d9cfbdd8299d29 Mon Sep 17 00:00:00 2001 From: "min.tian" Date: Thu, 9 Jan 2025 12:09:06 +0800 Subject: [PATCH] Streaming Cases - Modified the 'search_stage' based on the end of the insertion process rather than the start. - Errors occurring in the search sub-process will no longer interrupt the testing; they will only impact the metrics for that specific stage. - Only errors from the insert sub-process will halt the overall testing. - During concurrent search testing, database errors will not terminate the search test. QPS metrics will now only count successful requests. - Updated the front-end to allow customization of case_config via the UI. - Implemented a new page ('/streaming') to display results for streaming cases. Signed-off-by: min.tian --- vectordb_bench/__init__.py | 2 + vectordb_bench/backend/assembler.py | 2 + vectordb_bench/backend/cases.py | 70 ++++- vectordb_bench/backend/dataset.py | 93 +++++-- vectordb_bench/backend/runner/__init__.py | 6 +- vectordb_bench/backend/runner/mp_runner.py | 51 ++-- vectordb_bench/backend/runner/rate_runner.py | 119 ++++---- .../backend/runner/read_write_runner.py | 170 +++++++----- .../backend/runner/serial_runner.py | 41 ++- vectordb_bench/backend/task_runner.py | 53 +++- .../frontend/components/check_results/data.py | 17 +- .../components/check_results/filters.py | 40 +-- .../components/check_results/headerIcon.py | 4 +- .../components/run_test/caseSelector.py | 103 ++++--- .../components/run_test/inputWidget.py | 46 ++++ .../components/run_test/submitTask.py | 4 +- .../frontend/components/streaming/charts.py | 257 ++++++++++++++++++ .../frontend/components/streaming/data.py | 62 +++++ .../frontend/components/tables/data.py | 2 +- .../frontend/config/dbCaseConfigs.py | 207 ++++++++++---- vectordb_bench/frontend/config/styles.py | 21 +- vectordb_bench/frontend/pages/concurrent.py | 2 +- vectordb_bench/frontend/pages/streaming.py | 129 +++++++++ vectordb_bench/interface.py | 2 +- vectordb_bench/metric.py | 16 +- vectordb_bench/models.py | 33 ++- 26 files changed, 1215 insertions(+), 337 deletions(-) create mode 100644 vectordb_bench/frontend/components/run_test/inputWidget.py create mode 100644 vectordb_bench/frontend/components/streaming/charts.py create mode 100644 vectordb_bench/frontend/components/streaming/data.py create mode 100644 vectordb_bench/frontend/pages/streaming.py diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index c07fc855d..37e8d470f 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -66,6 +66,7 @@ class config: CAPACITY_TIMEOUT_IN_SECONDS = 24 * 3600 # 24h LOAD_TIMEOUT_DEFAULT = 24 * 3600 # 24h + LOAD_TIMEOUT_768D_100K = 24 * 3600 # 24h LOAD_TIMEOUT_768D_1M = 24 * 3600 # 24h LOAD_TIMEOUT_768D_10M = 240 * 3600 # 10d LOAD_TIMEOUT_768D_100M = 2400 * 3600 # 100d @@ -74,6 +75,7 @@ class config: LOAD_TIMEOUT_1536D_5M = 240 * 3600 # 10d OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h + OPTIMIZE_TIMEOUT_768D_100K = 24 * 3600 # 24h OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d OPTIMIZE_TIMEOUT_768D_100M = 2400 * 3600 # 100d diff --git a/vectordb_bench/backend/assembler.py b/vectordb_bench/backend/assembler.py index b81d315c2..d945c2997 100644 --- a/vectordb_bench/backend/assembler.py +++ b/vectordb_bench/backend/assembler.py @@ -39,6 +39,7 @@ def assemble_all( runners = [cls.assemble(run_id, task, source) for task in tasks] load_runners = [r for r in runners if r.ca.label == CaseLabel.Load] perf_runners = [r for r in runners if r.ca.label == CaseLabel.Performance] + streaming_runners = [r for r in runners if r.ca.label == CaseLabel.Streaming] # group by db db2runner = {} @@ -58,6 +59,7 @@ def assemble_all( all_runners = [] all_runners.extend(load_runners) + all_runners.extend(streaming_runners) for v in db2runner.values(): all_runners.extend(v) diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py index 15fc069cc..8b48441cf 100644 --- a/vectordb_bench/backend/cases.py +++ b/vectordb_bench/backend/cases.py @@ -1,3 +1,4 @@ +import json import logging from enum import Enum, auto @@ -8,7 +9,7 @@ CustomDatasetConfig, ) -from .dataset import CustomDataset, Dataset, DatasetManager +from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType log = logging.getLogger(__name__) @@ -47,6 +48,8 @@ class CaseType(Enum): Custom = 100 PerformanceCustomDataset = 101 + StreamingPerformanceCase = 200 + def case_cls(self, custom_configs: dict | None = None) -> type["Case"]: if custom_configs is None: return type2case.get(self)() @@ -68,6 +71,7 @@ def case_description(self, custom_configs: dict | None = None) -> str: class CaseLabel(Enum): Load = auto() Performance = auto() + Streaming = auto() class Case(BaseModel): @@ -87,7 +91,7 @@ class Case(BaseModel): description: str dataset: DatasetManager - load_timeout: float | int + load_timeout: float | int | None = None optimize_timeout: float | int | None = None filter_rate: float | None = None @@ -104,14 +108,14 @@ def filters(self) -> dict | None: return None -class CapacityCase(Case, BaseModel): +class CapacityCase(Case): label: CaseLabel = CaseLabel.Load filter_rate: float | None = None load_timeout: float | int = config.CAPACITY_TIMEOUT_IN_SECONDS optimize_timeout: float | int | None = None -class PerformanceCase(Case, BaseModel): +class PerformanceCase(Case): label: CaseLabel = CaseLabel.Performance filter_rate: float | None = None load_timeout: float | int = config.LOAD_TIMEOUT_DEFAULT @@ -349,6 +353,63 @@ def __init__( ) +Streaming_Insert_Rate_Step = 100 + + +class StreamingPerformanceCase(Case): + case_id: CaseType = CaseType.StreamingPerformanceCase + label: CaseLabel = CaseLabel.Streaming + dataset_with_size_type: DatasetWithSizeType + insert_rate: int + search_stages: list[float] + concurrencies: list[int] + optimize_after_write: bool = True + read_dur_after_write: int = 30 + + def __init__( + self, + dataset_with_size_type: DatasetWithSizeType | str = DatasetWithSizeType.CohereSmall.value, + insert_rate: int = 500, + search_stages: list[float] | str = (0.5, 0.8), + concurrencies: list[int] | str = (5, 10), + **kwargs, + ): + if insert_rate % config.NUM_PER_BATCH != 0: + _insert_rate = max( + Streaming_Insert_Rate_Step, + insert_rate // Streaming_Insert_Rate_Step * Streaming_Insert_Rate_Step, + ) + log.warning( + f"[streaming_case init] insert_rate(={insert_rate}) should be " + f"divisible by {Streaming_Insert_Rate_Step}), reset to {_insert_rate}", + ) + insert_rate = _insert_rate + if not isinstance(dataset_with_size_type, DatasetWithSizeType): + dataset_with_size_type = DatasetWithSizeType(dataset_with_size_type) + dataset = dataset_with_size_type.get_manager() + name = f"Streaming-Perf - {dataset_with_size_type.value}, {insert_rate} rows/s" + description = ( + "This case tests the search performance of vector database while maintaining " + f"a fixed insertion speed. (dataset: {dataset_with_size_type.value})" + ) + + if isinstance(search_stages, str): + search_stages = json.loads(search_stages) + if isinstance(concurrencies, str): + concurrencies = json.loads(concurrencies) + + super().__init__( + name=name, + description=description, + dataset=dataset, + dataset_with_size_type=dataset_with_size_type, + insert_rate=insert_rate, + search_stages=search_stages, + concurrencies=concurrencies, + **kwargs, + ) + + type2case = { CaseType.CapacityDim960: CapacityDim960, CaseType.CapacityDim128: CapacityDim128, @@ -367,4 +428,5 @@ def __init__( CaseType.Performance1536D5M99P: Performance1536D5M99P, CaseType.Performance1536D50K: Performance1536D50K, CaseType.PerformanceCustomDataset: PerformanceCustomDataset, + CaseType.StreamingPerformanceCase: StreamingPerformanceCase, } diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index 62700b0fa..7c0307090 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -39,6 +39,15 @@ class BaseDataset(BaseModel): with_gt: bool = False _size_label: dict[int, SizeLabel] = PrivateAttr() is_custom: bool = False + with_remote_resource: bool = True + with_scalar_labels: bool = False + train_id_field: str = "id" + train_vector_field: str = "emb" + test_file: str = "test.parquet" + test_id_field: str = "id" + test_vector_field: str = "emb" + gt_id_field: str = "id" + gt_neighbors_field: str = "neighbors_id" @validator("size") def verify_size(cls, v: int): @@ -51,6 +60,10 @@ def verify_size(cls, v: int): def label(self) -> str: return self._size_label.get(self.size).label + @property + def full_name(self) -> str: + return f"{self.name.capitalize()} ({self.label.capitalize()})" + @property def dir_name(self) -> str: return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower() @@ -59,11 +72,16 @@ def dir_name(self) -> str: def file_count(self) -> int: return self._size_label.get(self.size).file_count + @property + def train_files(self) -> list[str]: + return utils.compose_train_files(self.file_count, self.use_shuffled) + class CustomDataset(BaseDataset): dir: str file_num: int is_custom: bool = True + with_remote_resource: bool = False @validator("size") def verify_size(cls, v: int): @@ -109,7 +127,7 @@ class Cohere(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA - with_gt: bool = (True,) + with_gt: bool = True _size_label: dict = { 100_000: SizeLabel(100_000, "SMALL", 1), 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), @@ -146,7 +164,7 @@ class OpenAI(BaseDataset): dim: int = 1536 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA - with_gt: bool = (True,) + with_gt: bool = True _size_label: dict = { 50_000: SizeLabel(50_000, "SMALL", 1), 500_000: SizeLabel(500_000, "MEDIUM", 1), @@ -166,8 +184,8 @@ class DatasetManager(BaseModel): """ data: BaseDataset - test_data: pd.DataFrame | None = None - gt_data: pd.DataFrame | None = None + test_data: list[list[float]] | None = None + gt_data: list[list[int]] | None = None train_files: list[str] = [] reader: DatasetReader | None = None @@ -191,7 +209,7 @@ def data_dir(self) -> pathlib.Path: return pathlib.Path( config.DATASET_LOCAL_DIR, self.data.name.lower(), - self.data.dir_name.lower(), + self.data.dir_name, ) def __iter__(self): @@ -215,29 +233,24 @@ def prepare( bool: whether the dataset is successfully prepared """ - file_count, use_shuffled = self.data.file_count, self.data.use_shuffled - - train_files = utils.compose_train_files(file_count, use_shuffled) - all_files = train_files - + self.train_files = self.data.train_files gt_file, test_file = None, None if self.data.with_gt: - gt_file, test_file = utils.compose_gt_file(filters), "test.parquet" - all_files.extend([gt_file, test_file]) + gt_file, test_file = utils.compose_gt_file(filters), self.data.test_file - if not self.data.is_custom: + if self.data.with_remote_resource: + download_files = [file for file in self.train_files] + download_files.extend([gt_file, test_file]) source.reader().read( dataset=self.data.dir_name.lower(), - files=all_files, + files=download_files, local_ds_root=self.data_dir, ) if gt_file is not None and test_file is not None: - self.test_data = self._read_file(test_file) - self.gt_data = self._read_file(gt_file) + self.test_data = self._read_file(test_file)[self.data.test_vector_field].to_list() + self.gt_data = self._read_file(gt_file)[self.data.gt_neighbors_field].to_list() - prefix = "shuffle_train" if use_shuffled else "train" - self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")]) log.debug(f"{self.data.name}: available train files {self.train_files}") return True @@ -313,3 +326,47 @@ def get(self, size: int) -> BaseDataset: def manager(self, size: int) -> DatasetManager: return DatasetManager(data=self.get(size)) + + +class DatasetWithSizeType(Enum): + CohereSmall = "Small Cohere (768dim, 100K)" + CohereMedium = "Medium Cohere (768dim, 1M)" + CohereLarge = "Large Cohere (768dim, 10M)" + OpenAISmall = "Small OpenAI (1536dim, 50K)" + OpenAIMedium = "Medium OpenAI (1536dim, 500K)" + OpenAILarge = "Large OpenAI (1536dim, 5M)" + + def get_manager(self) -> DatasetManager: + if self not in DatasetWithSizeMap: + msg = f"wrong ScalarDatasetWithSizeType: {self.name}" + raise ValueError(msg) + return DatasetWithSizeMap.get(self) + + def get_load_timeout(self) -> float: + if "small" in self.value.lower(): + return config.LOAD_TIMEOUT_768D_100K + if "medium" in self.value.lower(): + return config.LOAD_TIMEOUT_768D_1M + if "large" in self.value.lower(): + return config.LOAD_TIMEOUT_768D_10M + msg = f"No load_timeout for {self.value}" + raise KeyError(msg) + + def get_optimize_timeout(self) -> float: + if "small" in self.value.lower(): + return config.OPTIMIZE_TIMEOUT_768D_100K + if "medium" in self.value.lower(): + return config.OPTIMIZE_TIMEOUT_768D_1M + if "large" in self.value.lower(): + return config.OPTIMIZE_TIMEOUT_768D_10M + return config.OPTIMIZE_TIMEOUT_DEFAULT + + +DatasetWithSizeMap = { + DatasetWithSizeType.CohereSmall: Dataset.COHERE.manager(100_000), + DatasetWithSizeType.CohereMedium: Dataset.COHERE.manager(1_000_000), + DatasetWithSizeType.CohereLarge: Dataset.COHERE.manager(10_000_000), + DatasetWithSizeType.OpenAISmall: Dataset.OPENAI.manager(50_000), + DatasetWithSizeType.OpenAIMedium: Dataset.OPENAI.manager(500_000), + DatasetWithSizeType.OpenAILarge: Dataset.OPENAI.manager(5_000_000), +} diff --git a/vectordb_bench/backend/runner/__init__.py b/vectordb_bench/backend/runner/__init__.py index b83df6f99..ceb980a98 100644 --- a/vectordb_bench/backend/runner/__init__.py +++ b/vectordb_bench/backend/runner/__init__.py @@ -1,10 +1,10 @@ -from .mp_runner import ( - MultiProcessingSearchRunner, -) +from .mp_runner import MultiProcessingSearchRunner +from .read_write_runner import ReadWriteRunner from .serial_runner import SerialInsertRunner, SerialSearchRunner __all__ = [ "MultiProcessingSearchRunner", "SerialInsertRunner", "SerialSearchRunner", + "ReadWriteRunner", ] diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index 5b69b5481..d6ed282bb 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -67,12 +67,13 @@ def search( self.k, self.filters, ) + count += 1 + latencies.append(time.perf_counter() - s) except Exception as e: log.warning(f"VectorDB search_embedding error: {e}") - traceback.print_exc(chain=True) - raise e from None + # traceback.print_exc(chain=True) + # raise e from None - latencies.append(time.perf_counter() - s) count += 1 # loop through the test data idx = idx + 1 if idx < num - 1 else 0 @@ -180,10 +181,10 @@ def run(self) -> float: def stop(self) -> None: pass - def run_by_dur(self, duration: int) -> float: + def run_by_dur(self, duration: int) -> tuple[float, float]: return self._run_by_dur(duration) - def _run_by_dur(self, duration: int) -> float: + def _run_by_dur(self, duration: int) -> tuple[float, float]: max_qps = 0 try: for conc in self.concurrencies: @@ -211,14 +212,17 @@ def _run_by_dur(self, duration: int) -> float: ) start = time.perf_counter() - all_count = sum([r.result() for r in future_iter]) + res = [r.result() for r in future_iter] + all_success_count = sum([r[0] for r in res]) + all_failed_count = sum([r[1] for r in res]) + failed_rate = all_failed_count / (all_failed_count + all_success_count) cost = time.perf_counter() - start - qps = round(all_count / cost, 4) + qps = round(all_success_count / cost, 4) log.info( - f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}", + f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}" + f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}", ) - if qps > max_qps: max_qps = qps log.info( @@ -237,15 +241,9 @@ def _run_by_dur(self, duration: int) -> float: finally: self.stop() - return max_qps + return max_qps, failed_rate - def search_by_dur( - self, - dur: int, - test_data: list[list[float]], - q: mp.Queue, - cond: mp.Condition, - ) -> int: + def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]: # sync all process q.put(1) with cond: @@ -255,7 +253,8 @@ def search_by_dur( num, idx = len(test_data), random.randint(0, len(test_data) - 1) start_time = time.perf_counter() - count = 0 + success_count = 0 + failed_cnt = 0 while time.perf_counter() < start_time + dur: s = time.perf_counter() try: @@ -264,25 +263,27 @@ def search_by_dur( self.k, self.filters, ) + success_count += 1 except Exception as e: + failed_cnt += 1 log.warning(f"VectorDB search_embedding error: {e}") - traceback.print_exc(chain=True) - raise e from None + # traceback.print_exc(chain=True) + # raise e from None - count += 1 # loop through the test data idx = idx + 1 if idx < num - 1 else 0 - if count % 500 == 0: + if success_count % 500 == 0: log.debug( - f"({mp.current_process().name:16}) search_count: {count}, ", + f"({mp.current_process().name:16}) search_count: {success_count}, " f"latest_latency={time.perf_counter()-s}", ) total_dur = round(time.perf_counter() - start_time, 4) log.debug( f"{mp.current_process().name:16} search {self.duration}s: " - f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}", + f"actual_dur={total_dur}s, count={success_count}, failed_cnt={failed_cnt}, " + f"qps (successful) in this process: {round(success_count / total_dur, 4):3}", ) - return count + return success_count, failed_cnt diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py index 0145af4ce..dc2b066bc 100644 --- a/vectordb_bench/backend/runner/rate_runner.py +++ b/vectordb_bench/backend/runner/rate_runner.py @@ -5,7 +5,7 @@ from concurrent.futures import ThreadPoolExecutor from vectordb_bench import config -from vectordb_bench.backend.clients import api +from vectordb_bench.backend.clients import DB, api from vectordb_bench.backend.dataset import DataSetIterator from vectordb_bench.backend.utils import time_it @@ -13,6 +13,9 @@ log = logging.getLogger(__name__) +Time_Per_Insert_Batch = 1 # 1s +Max_Insert_Retry = 5 + class RatedMultiThreadingInsertRunner: def __init__( @@ -30,78 +33,92 @@ def __init__( self.insert_rate = rate self.batch_rate = rate // config.NUM_PER_BATCH - def send_insert_task(self, db: api.VectorDB, emb: list[list[float]], metadata: list[str]): - db.insert_embeddings(emb, metadata) + self.executing_futures = [] + self.sig_idx = 0 + + def send_insert_task(self, db: api.VectorDB, emb: list[list[float]], metadata: list[str], retry_idx: int = 0): + try: + db.insert_embeddings(emb, metadata) + except Exception as e: + log.warning(f"Insert Failed, try_idx={retry_idx}, Exception: {e}") + retry_idx += 1 + if retry_idx <= Max_Insert_Retry: + time.sleep(retry_idx) + self.send_insert_task(db, emb=emb, metadata=metadata, retry_idx=retry_idx) + else: + msg = f"Insert failed and retried more than {Max_Insert_Retry} times" + raise RuntimeError(msg) from e @time_it def run_with_rate(self, q: mp.Queue): with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor: - executing_futures = [] @time_it def submit_by_rate() -> bool: rate = self.batch_rate for data in self.dataset: emb, metadata = get_data(data, self.normalize) - executing_futures.append( - executor.submit(self.send_insert_task, self.db, emb, metadata), - ) + self.executing_futures.append(executor.submit(self.send_insert_task, self.db, emb, metadata)) rate -= 1 if rate == 0: return False return rate == self.batch_rate + def check_and_send_signal(wait_interval: float, finished: bool = False): + try: + done, not_done = concurrent.futures.wait( + self.executing_futures, + timeout=wait_interval, + return_when=concurrent.futures.FIRST_EXCEPTION, + ) + _ = [fut.result() for fut in done] + if len(not_done) > 0: + log.warning(f"[{len(not_done)}] tasks are not done, trying to wait in the next round") + self.executing_futures = list(not_done) + else: + self.executing_futures = [] + + self.sig_idx += len(done) + while self.sig_idx >= self.batch_rate: + self.sig_idx -= self.batch_rate + if self.sig_idx < self.batch_rate and len(not_done) == 0 and finished: + q.put(True, block=True) + else: + q.put(False, block=False) + + except Exception as e: + log.warning(f"task error, terminating, err={e}") + q.put(None, block=True) + executor.shutdown(wait=True, cancel_futures=True) + raise e + with self.db.init(): + start_time = time.perf_counter() + inserted_batch_cnt = 0 + while True: - start_time = time.perf_counter() finished, elapsed_time = submit_by_rate() if finished is True: - q.put(True, block=True) - log.info(f"End of dataset, left unfinished={len(executing_futures)}") + log.info(f"End of dataset, left unfinished={len(self.executing_futures)}") break - - q.put(False, block=False) - wait_interval = 1 - elapsed_time if elapsed_time < 1 else 0.001 - - try: - done, not_done = concurrent.futures.wait( - executing_futures, - timeout=wait_interval, - return_when=concurrent.futures.FIRST_EXCEPTION, + if elapsed_time >= 0.9: + log.warning( + f"Submit insert tasks took {elapsed_time}s, expected 1s, " + f"indicating potential resource limitations on the client machine.", ) + wait_interval = ( + Time_Per_Insert_Batch - elapsed_time if elapsed_time < Time_Per_Insert_Batch else 0.001 + ) - if len(not_done) > 0: - log.warning( - f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] ", - f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round", - ) - executing_futures = list(not_done) - else: - log.debug( - f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} ", - f"task in 1s, wait_interval={wait_interval:.2f}", - ) - executing_futures = [] - except Exception as e: - log.warning(f"task error, terminating, err={e}") - q.put(None, block=True) - executor.shutdown(wait=True, cancel_futures=True) - raise e from e - - dur = time.perf_counter() - start_time - if dur < 1: - time.sleep(1 - dur) + check_and_send_signal(wait_interval, finished=False) + + dur = time.perf_counter() - start_time - inserted_batch_cnt * Time_Per_Insert_Batch + if dur < Time_Per_Insert_Batch: + time.sleep(Time_Per_Insert_Batch - dur) + inserted_batch_cnt += 1 # wait for all tasks in executing_futures to complete - if len(executing_futures) > 0: - try: - done, _ = concurrent.futures.wait( - executing_futures, - return_when=concurrent.futures.FIRST_EXCEPTION, - ) - except Exception as e: - log.warning(f"task error, terminating, err={e}") - q.put(None, block=True) - executor.shutdown(wait=True, cancel_futures=True) - raise e from e + wait_interval = 1 + while len(self.executing_futures) > 0: + check_and_send_signal(wait_interval, finished=True) diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py index e916f45d6..4efb3dd4c 100644 --- a/vectordb_bench/backend/runner/read_write_runner.py +++ b/vectordb_bench/backend/runner/read_write_runner.py @@ -1,13 +1,17 @@ import concurrent +import concurrent.futures import logging import math import multiprocessing as mp +import time from collections.abc import Iterable import numpy as np from vectordb_bench.backend.clients import api from vectordb_bench.backend.dataset import DatasetManager +from vectordb_bench.backend.utils import time_it +from vectordb_bench.metric import Metric from .mp_runner import MultiProcessingSearchRunner from .rate_runner import RatedMultiThreadingInsertRunner @@ -26,33 +30,37 @@ def __init__( k: int = 100, filters: dict | None = None, concurrencies: Iterable[int] = (1, 15, 50), - search_stage: Iterable[float] = ( + search_stages: Iterable[float] = ( 0.5, 0.6, 0.7, 0.8, 0.9, ), # search from insert portion, 0.0 means search from the start + optimize_after_write: bool = True, read_dur_after_write: int = 300, # seconds, search duration when insertion is done timeout: float | None = None, ): self.insert_rate = insert_rate self.data_volume = dataset.data.size - for stage in search_stage: + for stage in search_stages: assert 0.0 <= stage < 1.0, "each search stage should be in [0.0, 1.0)" - self.search_stage = sorted(search_stage) + self.search_stages = sorted(search_stages) + self.optimize_after_write = optimize_after_write self.read_dur_after_write = read_dur_after_write log.info( - f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, ", + f"Init runner, concurencys={concurrencies}, search_stages={self.search_stages}, " f"stage_search_dur={read_dur_after_write}", ) - test_emb = np.stack(dataset.test_data["emb"]) if normalize: + test_emb = np.array(dataset.test_data) test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis] - test_emb = test_emb.tolist() + test_emb = test_emb.tolist() + else: + test_emb = dataset.test_data MultiProcessingSearchRunner.__init__( self, @@ -76,6 +84,7 @@ def __init__( k=k, ) + @time_it def run_optimize(self): """Optimize needs to run in differenct process for pymilvus schema recursion problem""" with self.db.init(): @@ -85,49 +94,90 @@ def run_optimize(self): def run_search(self): log.info("Search after write - Serial search start") + perc = 100 + test_time = round(time.perf_counter(), 4) res, ssearch_dur = self.serial_search_runner.run() recall, ndcg, p99_latency = res log.info( - f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, ", - f"dur={ssearch_dur:.4f}", + f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, " + f"p99={p99_latency}, dur={ssearch_dur:.4f}", ) log.info( f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}", ) - max_qps = self.run_by_dur(self.read_dur_after_write) + max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write) log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}") - return (max_qps, recall, ndcg, p99_latency) + return [(perc, test_time, max_qps, recall, ndcg, p99_latency, conc_failed_rate)] - def run_read_write(self): - with mp.Manager() as m: - q = m.Queue() - with concurrent.futures.ProcessPoolExecutor( - mp_context=mp.get_context("spawn"), - max_workers=2, - ) as executor: - read_write_futures = [] - read_write_futures.append(executor.submit(self.run_with_rate, q)) - read_write_futures.append(executor.submit(self.run_search_by_sig, q)) + def run_read_write(self) -> Metric: + m = Metric() + with mp.Manager() as mp_manager: + q = mp_manager.Queue() + with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context("spawn"), max_workers=2) as executor: + insert_future = executor.submit(self.run_with_rate, q) + search_future = executor.submit(self.run_search_by_sig, q) try: - for f in concurrent.futures.as_completed(read_write_futures): - res = f.result() - log.info(f"Result = {res}") + start_time = time.perf_counter() + _, m.insert_duration = insert_future.result() + search_res = search_future.result() # Wait for read_write_futures finishing and do optimize and search - op_future = executor.submit(self.run_optimize) - op_future.result() + if self.optimize_after_write: + op_future = executor.submit(self.run_optimize) + _, m.optimize_duration = op_future.result() + log.info(f"Optimize cost {m.optimize_duration}s") + else: + log.info("Skip optimization") + + if self.read_dur_after_write > 0: + final_search_future = executor.submit(self.run_search) + final_search_res = final_search_future.result() + else: + log.info("Skip search after write") + final_search_res = [] + + if search_res is not None and final_search_res is not None: + r = [*search_res, *final_search_res] + m.st_search_stage_list = [d[0] for d in r] + m.st_search_time_list = [round(d[1] - start_time, 4) for d in r] + m.st_max_qps_list_list = [d[2] for d in r] + m.st_recall_list = [d[3] for d in r] + m.st_ndcg_list = [d[4] for d in r] + m.st_serial_latency_p99_list = [d[5] for d in r] + m.st_conc_failed_rate_list = [d[6] for d in r] - search_future = executor.submit(self.run_search) - last_res = search_future.result() - - log.info(f"Max QPS after optimze and search: {last_res}") except Exception as e: log.warning(f"Read and write error: {e}") executor.shutdown(wait=True, cancel_futures=True) - raise e from e - log.info("Concurrent read write all done") + # raise e + m.st_ideal_insert_duration = math.ceil(self.data_volume / self.insert_rate) + log.info(f"Concurrent read write all done, results: {m}") + return m + + def get_each_conc_search_dur(self, ssearch_dur: float, cur_stage: float, next_stage: float) -> float: + # Search duration for non-last search stage is carefully calculated. + # If duration for each concurrency is less than 30s, runner will raise error. + total_dur_between_stages = self.data_volume * (next_stage - cur_stage) // self.insert_rate + csearch_dur = total_dur_between_stages - ssearch_dur + + # Try to leave room for init process executors + if csearch_dur > 60: + csearch_dur -= 30 + elif csearch_dur > 30: + csearch_dur -= 15 + else: + csearch_dur /= 2 + + each_conc_search_dur = csearch_dur / len(self.concurrencies) + if each_conc_search_dur < 30: + warning_msg = ( + f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short," + f" total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}." + ) + log.warning(warning_msg) + return each_conc_search_dur def run_search_by_sig(self, q: mp.Queue): """ @@ -151,7 +201,7 @@ def wait_next_target(start: int, target_batch: int) -> bool: start += 1 return True - for idx, stage in enumerate(self.search_stage): + for idx, stage in enumerate(self.search_stages): target_batch = int(total_batch * stage) perc = int(stage * 100) @@ -163,41 +213,29 @@ def wait_next_target(start: int, target_batch: int) -> bool: return None log.info(f"Insert {perc}% done, total batch={total_batch}") - log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start") - res, ssearch_dur = self.serial_search_runner.run() - recall, ndcg, p99_latency = res - log.info( - f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ", - f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}", - ) - - # Search duration for non-last search stage is carefully calculated. - # If duration for each concurrency is less than 30s, runner will raise error. - if idx < len(self.search_stage) - 1: - total_dur_between_stages = self.data_volume * (self.search_stage[idx + 1] - stage) // self.insert_rate - csearch_dur = total_dur_between_stages - ssearch_dur - - # Try to leave room for init process executors - csearch_dur = csearch_dur - 30 if csearch_dur > 60 else csearch_dur - - each_conc_search_dur = csearch_dur / len(self.concurrencies) - if each_conc_search_dur < 30: - warning_msg = ( - f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, ", - f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}.", - ) - log.warning(warning_msg) - - # The last stage - else: - each_conc_search_dur = 60 - - log.info( - f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}", - ) - max_qps = self.run_by_dur(each_conc_search_dur) - result.append((perc, max_qps, recall, ndcg, p99_latency)) + test_time = round(time.perf_counter(), 4) + max_qps, recall, ndcg, p99_latency, conc_failed_rate = 0, 0, 0, 0, 0 + try: + log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start") + res, ssearch_dur = self.serial_search_runner.run() + recall, ndcg, p99_latency = res + log.info( + f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}" + ) + each_conc_search_dur = self.get_each_conc_search_dur( + ssearch_dur, + cur_stage=stage, + next_stage=self.search_stages[idx + 1] if idx < len(self.search_stages) - 1 else 1.0, + ) + log.info( + f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}" + ) + if each_conc_search_dur > 0: + max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur) + except Exception as e: + log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}") + result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, conc_failed_rate)) start_batch = target_batch # Drain the queue diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index 7eb59432b..edd65e887 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -6,7 +6,6 @@ import traceback import numpy as np -import pandas as pd import psutil from vectordb_bench.backend.dataset import DatasetManager @@ -189,12 +188,15 @@ def run(self) -> int: return count +Max_Search_Retry = 5 + + class SerialSearchRunner: def __init__( self, db: api.VectorDB, test_data: list[list[float]], - ground_truth: pd.DataFrame, + ground_truth: list[list[int]], k: int = 100, filters: dict | None = None, ): @@ -208,35 +210,44 @@ def __init__( self.test_data = test_data self.ground_truth = ground_truth - def search(self, args: tuple[list, pd.DataFrame]) -> tuple[float, float, float]: - log.info( - f"{mp.current_process().name:14} start search the entire test_data to get recall and latency", - ) + def _get_db_search_res(self, emb, retry_idx: int = 0) -> list[int]: + try: + results = self.db.search_embedding( + emb, + self.k, + self.filters, + ) + except Exception as e: + log.warning(f"Serial search failed, retry_idx={retry_idx}, Exception: {e}") + if retry_idx < Max_Search_Retry: + return self._get_db_search_res(emb=emb, retry_idx=retry_idx + 1) + else: + raise RuntimeError(f"Serial search failed and retried more than {Max_Search_Retry} times") from e + + return results + + def search(self, args: tuple[list, list[list[int]]]) -> tuple[float, float, float]: + log.info(f"{mp.current_process().name:14} start search the entire test_data to get recall and latency") with self.db.init(): test_data, ground_truth = args ideal_dcg = get_ideal_dcg(self.k) log.debug(f"test dataset size: {len(test_data)}") - log.debug(f"ground truth size: {ground_truth.columns}, shape: {ground_truth.shape}") + log.debug(f"ground truth size: {len(ground_truth)}") latencies, recalls, ndcgs = [], [], [] for idx, emb in enumerate(test_data): s = time.perf_counter() try: - results = self.db.search_embedding( - emb, - self.k, - self.filters, - ) - + results = self._get_db_search_res(emb) except Exception as e: log.warning(f"VectorDB search_embedding error: {e}") - traceback.print_exc(chain=True) + # traceback.print_exc(chain=True) raise e from None latencies.append(time.perf_counter() - s) - gt = ground_truth["neighbors_id"][idx] + gt = ground_truth[idx] recalls.append(calc_recall(self.k, gt[: self.k], results)) ndcgs.append(calc_ndcg(gt[: self.k], results, ideal_dcg)) diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index e24d74f03..9d0092844 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -6,15 +6,14 @@ import numpy as np import psutil -from vectordb_bench.base import BaseModel -from vectordb_bench.metric import Metric -from vectordb_bench.models import PerformanceTimeoutError, TaskConfig, TaskStage - +from ..base import BaseModel +from ..metric import Metric +from ..models import PerformanceTimeoutError, TaskConfig, TaskStage from . import utils -from .cases import Case, CaseLabel +from .cases import Case, CaseLabel, StreamingPerformanceCase from .clients import MetricType, api from .data_source import DatasetSource -from .runner import MultiProcessingSearchRunner, SerialInsertRunner, SerialSearchRunner +from .runner import MultiProcessingSearchRunner, ReadWriteRunner, SerialInsertRunner, SerialSearchRunner log = logging.getLogger(__name__) @@ -48,6 +47,7 @@ class CaseRunner(BaseModel): serial_search_runner: SerialSearchRunner | None = None search_runner: MultiProcessingSearchRunner | None = None final_search_runner: MultiProcessingSearchRunner | None = None + read_write_runner: ReadWriteRunner | None = None def __eq__(self, obj: any): if isinstance(obj, CaseRunner): @@ -63,6 +63,7 @@ def display(self) -> dict: c_dict = self.ca.dict( include={ "label": True, + "name": True, "filters": True, "dataset": { "data": { @@ -112,6 +113,8 @@ def run(self, drop_old: bool = True) -> Metric: return self._run_capacity_case() if self.ca.label == CaseLabel.Performance: return self._run_perf_case(drop_old) + if self.ca.label == CaseLabel.Streaming: + return self._run_streaming_case() msg = f"unknown case type: {self.ca.label}" log.warning(msg) raise ValueError(msg) @@ -192,10 +195,6 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: ) = search_results if TaskStage.SEARCH_SERIAL in self.config.stages: search_results = self._serial_search() - """ - m.recall = search_results.recall - m.serial_latencies = search_results.serial_latencies - """ m.recall, m.ndcg, m.serial_latency_p99 = search_results except Exception as e: @@ -206,6 +205,19 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: log.info(f"Performance case got result: {m}") return m + def _run_streaming_case(self) -> Metric: + log.info("Start streaming case") + try: + self._init_read_write_runner() + m = self.read_write_runner.run_read_write() + except Exception as e: + log.warning(f"Failed to run streaming case, reason = {e}") + traceback.print_exc() + raise e from None + else: + log.info(f"Streaming case got result: {m}") + return m + @utils.time_it def _load_train_data(self): """Insert train data and get the insert_duration""" @@ -222,7 +234,7 @@ def _load_train_data(self): finally: runner = None - def _serial_search(self) -> tuple[float, float, float]: + def _serial_search(self) -> tuple[float, float]: """Performance serial tests, search the entire test data once, calculate the recall, serial_latency_p99 @@ -273,10 +285,12 @@ def _optimize(self) -> float: raise e from None def _init_search_runner(self): - test_emb = np.stack(self.ca.dataset.test_data["emb"]) if self.normalize: + test_emb = np.stack(self.ca.dataset.test_data) test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis] - self.test_emb = test_emb.tolist() + self.test_emb = test_emb.tolist() + else: + self.test_emb = self.ca.dataset.test_data gt_df = self.ca.dataset.gt_data @@ -298,6 +312,19 @@ def _init_search_runner(self): k=self.config.case_config.k, ) + def _init_read_write_runner(self): + ca: StreamingPerformanceCase = self.ca + self.read_write_runner = ReadWriteRunner( + db=self.db, + dataset=ca.dataset, + insert_rate=ca.insert_rate, + search_stages=ca.search_stages, + optimize_after_write=ca.optimize_after_write, + read_dur_after_write=ca.read_dur_after_write, + concurrencies=ca.concurrencies, + k=self.config.case_config.k, + ) + def stop(self): if self.search_runner: self.search_runner.stop() diff --git a/vectordb_bench/frontend/components/check_results/data.py b/vectordb_bench/frontend/components/check_results/data.py index 94d1b4eab..57f42c28e 100644 --- a/vectordb_bench/frontend/components/check_results/data.py +++ b/vectordb_bench/frontend/components/check_results/data.py @@ -23,7 +23,7 @@ def getFilterTasks( task for task in tasks if task.task_config.db_name in dbNames - and task.task_config.case_config.case_id.case_cls(task.task_config.case_config.custom_case).name in caseNames + and task.task_config.case_config.case_name in caseNames ] return filterTasks @@ -35,17 +35,22 @@ def mergeTasks(tasks: list[CaseResult]): db = task.task_config.db.value db_label = task.task_config.db_config.db_label or "" version = task.task_config.db_config.version or "" - case = task.task_config.case_config.case_id.case_cls(task.task_config.case_config.custom_case) + case = task.task_config.case_config.case + case_name = case.name + dataset_name = case.dataset.data.full_name + filter_rate = case.filter_rate dbCaseMetricsMap[db_name][case.name] = { "db": db, "db_label": db_label, "version": version, + "dataset_name": dataset_name, + "filter_rate": filter_rate, "metrics": mergeMetrics( - dbCaseMetricsMap[db_name][case.name].get("metrics", {}), + dbCaseMetricsMap[db_name][case_name].get("metrics", {}), asdict(task.metrics), ), "label": getBetterLabel( - dbCaseMetricsMap[db_name][case.name].get("label", ResultLabel.FAILED), + dbCaseMetricsMap[db_name][case_name].get("label", ResultLabel.FAILED), task.label, ), } @@ -59,12 +64,16 @@ def mergeTasks(tasks: list[CaseResult]): db_label = metricInfo["db_label"] version = metricInfo["version"] label = metricInfo["label"] + dataset_name = metricInfo["dataset_name"] + filter_rate = metricInfo["filter_rate"] if label == ResultLabel.NORMAL: mergedTasks.append( { "db_name": db_name, "db": db, "db_label": db_label, + "dataset_name": dataset_name, + "filter_rate": filter_rate, "version": version, "case_name": case_name, "metricsSet": set(metrics.keys()), diff --git a/vectordb_bench/frontend/components/check_results/filters.py b/vectordb_bench/frontend/components/check_results/filters.py index 129c1d5ae..445fd9d48 100644 --- a/vectordb_bench/frontend/components/check_results/filters.py +++ b/vectordb_bench/frontend/components/check_results/filters.py @@ -1,14 +1,17 @@ from vectordb_bench.backend.cases import Case from vectordb_bench.frontend.components.check_results.data import getChartData -from vectordb_bench.frontend.components.check_results.expanderStyle import initSidebarExanderStyle +from vectordb_bench.frontend.components.check_results.expanderStyle import ( + initSidebarExanderStyle, +) from vectordb_bench.frontend.config.dbCaseConfigs import CASE_NAME_ORDER -from vectordb_bench.frontend.config.styles import * +from vectordb_bench.frontend.config.styles import SIDEBAR_CONTROL_COLUMNS import streamlit as st +from typing import Callable from vectordb_bench.models import CaseResult, TestResult -def getshownData(results: list[TestResult], st): +def getshownData(st, results: list[TestResult], **kwargs): # hide the nav st.markdown( "", @@ -17,15 +20,20 @@ def getshownData(results: list[TestResult], st): st.header("Filters") - shownResults = getshownResults(results, st) - showDBNames, showCaseNames = getShowDbsAndCases(shownResults, st) + shownResults = getshownResults(st, results, **kwargs) + showDBNames, showCaseNames = getShowDbsAndCases(st, shownResults) shownData, failedTasks = getChartData(shownResults, showDBNames, showCaseNames) return shownData, failedTasks, showCaseNames -def getshownResults(results: list[TestResult], st) -> list[CaseResult]: +def getshownResults( + st, + results: list[TestResult], + case_results_filter: Callable[[CaseResult], bool] = lambda x: True, + **kwargs, +) -> list[CaseResult]: resultSelectOptions = [ result.task_label if result.task_label != result.run_id else f"res-{result.run_id[:4]}" for result in results ] @@ -41,23 +49,17 @@ def getshownResults(results: list[TestResult], st) -> list[CaseResult]: ) selectedResult: list[CaseResult] = [] for option in selectedResultSelectedOptions: - result = results[resultSelectOptions.index(option)].results - selectedResult += result + case_results = results[resultSelectOptions.index(option)].results + selectedResult += [r for r in case_results if case_results_filter(r)] return selectedResult -def getShowDbsAndCases(result: list[CaseResult], st) -> tuple[list[str], list[str]]: +def getShowDbsAndCases(st, result: list[CaseResult]) -> tuple[list[str], list[str]]: initSidebarExanderStyle(st) allDbNames = list(set({res.task_config.db_name for res in result})) allDbNames.sort() - allCases: list[Case] = [ - res.task_config.case_config.case_id.case_cls(res.task_config.case_config.custom_case) for res in result - ] - allCaseNameSet = set({case.name for case in allCases}) - allCaseNames = [case_name for case_name in CASE_NAME_ORDER if case_name in allCaseNameSet] + [ - case_name for case_name in allCaseNameSet if case_name not in CASE_NAME_ORDER - ] + allCases: list[Case] = [res.task_config.case_config.case for res in result] # DB Filter dbFilterContainer = st.container() @@ -67,6 +69,12 @@ def getShowDbsAndCases(result: list[CaseResult], st) -> tuple[list[str], list[st allDbNames, col=1, ) + showCaseNames = [] + + allCaseNameSet = set({case.name for case in allCases}) + allCaseNames = [ + case_name for case_name in CASE_NAME_ORDER if case_name in allCaseNameSet + ] + [case_name for case_name in allCaseNameSet if case_name not in CASE_NAME_ORDER] # Case Filter caseFilterContainer = st.container() diff --git a/vectordb_bench/frontend/components/check_results/headerIcon.py b/vectordb_bench/frontend/components/check_results/headerIcon.py index 4715701f6..cd18251e8 100644 --- a/vectordb_bench/frontend/components/check_results/headerIcon.py +++ b/vectordb_bench/frontend/components/check_results/headerIcon.py @@ -8,8 +8,8 @@ def drawHeaderIcon(st):