diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py
index c07fc855..37e8d470 100644
--- a/vectordb_bench/__init__.py
+++ b/vectordb_bench/__init__.py
@@ -66,6 +66,7 @@ class config:
CAPACITY_TIMEOUT_IN_SECONDS = 24 * 3600 # 24h
LOAD_TIMEOUT_DEFAULT = 24 * 3600 # 24h
+ LOAD_TIMEOUT_768D_100K = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_1M = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_10M = 240 * 3600 # 10d
LOAD_TIMEOUT_768D_100M = 2400 * 3600 # 100d
@@ -74,6 +75,7 @@ class config:
LOAD_TIMEOUT_1536D_5M = 240 * 3600 # 10d
OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h
+ OPTIMIZE_TIMEOUT_768D_100K = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d
OPTIMIZE_TIMEOUT_768D_100M = 2400 * 3600 # 100d
diff --git a/vectordb_bench/backend/assembler.py b/vectordb_bench/backend/assembler.py
index b81d315c..d945c299 100644
--- a/vectordb_bench/backend/assembler.py
+++ b/vectordb_bench/backend/assembler.py
@@ -39,6 +39,7 @@ def assemble_all(
runners = [cls.assemble(run_id, task, source) for task in tasks]
load_runners = [r for r in runners if r.ca.label == CaseLabel.Load]
perf_runners = [r for r in runners if r.ca.label == CaseLabel.Performance]
+ streaming_runners = [r for r in runners if r.ca.label == CaseLabel.Streaming]
# group by db
db2runner = {}
@@ -58,6 +59,7 @@ def assemble_all(
all_runners = []
all_runners.extend(load_runners)
+ all_runners.extend(streaming_runners)
for v in db2runner.values():
all_runners.extend(v)
diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py
index 15fc069c..8b48441c 100644
--- a/vectordb_bench/backend/cases.py
+++ b/vectordb_bench/backend/cases.py
@@ -1,3 +1,4 @@
+import json
import logging
from enum import Enum, auto
@@ -8,7 +9,7 @@
CustomDatasetConfig,
)
-from .dataset import CustomDataset, Dataset, DatasetManager
+from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType
log = logging.getLogger(__name__)
@@ -47,6 +48,8 @@ class CaseType(Enum):
Custom = 100
PerformanceCustomDataset = 101
+ StreamingPerformanceCase = 200
+
def case_cls(self, custom_configs: dict | None = None) -> type["Case"]:
if custom_configs is None:
return type2case.get(self)()
@@ -68,6 +71,7 @@ def case_description(self, custom_configs: dict | None = None) -> str:
class CaseLabel(Enum):
Load = auto()
Performance = auto()
+ Streaming = auto()
class Case(BaseModel):
@@ -87,7 +91,7 @@ class Case(BaseModel):
description: str
dataset: DatasetManager
- load_timeout: float | int
+ load_timeout: float | int | None = None
optimize_timeout: float | int | None = None
filter_rate: float | None = None
@@ -104,14 +108,14 @@ def filters(self) -> dict | None:
return None
-class CapacityCase(Case, BaseModel):
+class CapacityCase(Case):
label: CaseLabel = CaseLabel.Load
filter_rate: float | None = None
load_timeout: float | int = config.CAPACITY_TIMEOUT_IN_SECONDS
optimize_timeout: float | int | None = None
-class PerformanceCase(Case, BaseModel):
+class PerformanceCase(Case):
label: CaseLabel = CaseLabel.Performance
filter_rate: float | None = None
load_timeout: float | int = config.LOAD_TIMEOUT_DEFAULT
@@ -349,6 +353,63 @@ def __init__(
)
+Streaming_Insert_Rate_Step = 100
+
+
+class StreamingPerformanceCase(Case):
+ case_id: CaseType = CaseType.StreamingPerformanceCase
+ label: CaseLabel = CaseLabel.Streaming
+ dataset_with_size_type: DatasetWithSizeType
+ insert_rate: int
+ search_stages: list[float]
+ concurrencies: list[int]
+ optimize_after_write: bool = True
+ read_dur_after_write: int = 30
+
+ def __init__(
+ self,
+ dataset_with_size_type: DatasetWithSizeType | str = DatasetWithSizeType.CohereSmall.value,
+ insert_rate: int = 500,
+ search_stages: list[float] | str = (0.5, 0.8),
+ concurrencies: list[int] | str = (5, 10),
+ **kwargs,
+ ):
+ if insert_rate % config.NUM_PER_BATCH != 0:
+ _insert_rate = max(
+ Streaming_Insert_Rate_Step,
+ insert_rate // Streaming_Insert_Rate_Step * Streaming_Insert_Rate_Step,
+ )
+ log.warning(
+ f"[streaming_case init] insert_rate(={insert_rate}) should be "
+ f"divisible by {Streaming_Insert_Rate_Step}), reset to {_insert_rate}",
+ )
+ insert_rate = _insert_rate
+ if not isinstance(dataset_with_size_type, DatasetWithSizeType):
+ dataset_with_size_type = DatasetWithSizeType(dataset_with_size_type)
+ dataset = dataset_with_size_type.get_manager()
+ name = f"Streaming-Perf - {dataset_with_size_type.value}, {insert_rate} rows/s"
+ description = (
+ "This case tests the search performance of vector database while maintaining "
+ f"a fixed insertion speed. (dataset: {dataset_with_size_type.value})"
+ )
+
+ if isinstance(search_stages, str):
+ search_stages = json.loads(search_stages)
+ if isinstance(concurrencies, str):
+ concurrencies = json.loads(concurrencies)
+
+ super().__init__(
+ name=name,
+ description=description,
+ dataset=dataset,
+ dataset_with_size_type=dataset_with_size_type,
+ insert_rate=insert_rate,
+ search_stages=search_stages,
+ concurrencies=concurrencies,
+ **kwargs,
+ )
+
+
type2case = {
CaseType.CapacityDim960: CapacityDim960,
CaseType.CapacityDim128: CapacityDim128,
@@ -367,4 +428,5 @@ def __init__(
CaseType.Performance1536D5M99P: Performance1536D5M99P,
CaseType.Performance1536D50K: Performance1536D50K,
CaseType.PerformanceCustomDataset: PerformanceCustomDataset,
+ CaseType.StreamingPerformanceCase: StreamingPerformanceCase,
}
diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py
index 62700b0f..7c030709 100644
--- a/vectordb_bench/backend/dataset.py
+++ b/vectordb_bench/backend/dataset.py
@@ -39,6 +39,15 @@ class BaseDataset(BaseModel):
with_gt: bool = False
_size_label: dict[int, SizeLabel] = PrivateAttr()
is_custom: bool = False
+ with_remote_resource: bool = True
+ with_scalar_labels: bool = False
+ train_id_field: str = "id"
+ train_vector_field: str = "emb"
+ test_file: str = "test.parquet"
+ test_id_field: str = "id"
+ test_vector_field: str = "emb"
+ gt_id_field: str = "id"
+ gt_neighbors_field: str = "neighbors_id"
@validator("size")
def verify_size(cls, v: int):
@@ -51,6 +60,10 @@ def verify_size(cls, v: int):
def label(self) -> str:
return self._size_label.get(self.size).label
+ @property
+ def full_name(self) -> str:
+ return f"{self.name.capitalize()} ({self.label.capitalize()})"
+
@property
def dir_name(self) -> str:
return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower()
@@ -59,11 +72,16 @@ def dir_name(self) -> str:
def file_count(self) -> int:
return self._size_label.get(self.size).file_count
+ @property
+ def train_files(self) -> list[str]:
+ return utils.compose_train_files(self.file_count, self.use_shuffled)
+
class CustomDataset(BaseDataset):
dir: str
file_num: int
is_custom: bool = True
+ with_remote_resource: bool = False
@validator("size")
def verify_size(cls, v: int):
@@ -109,7 +127,7 @@ class Cohere(BaseDataset):
dim: int = 768
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
- with_gt: bool = (True,)
+ with_gt: bool = True
_size_label: dict = {
100_000: SizeLabel(100_000, "SMALL", 1),
1_000_000: SizeLabel(1_000_000, "MEDIUM", 1),
@@ -146,7 +164,7 @@ class OpenAI(BaseDataset):
dim: int = 1536
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
- with_gt: bool = (True,)
+ with_gt: bool = True
_size_label: dict = {
50_000: SizeLabel(50_000, "SMALL", 1),
500_000: SizeLabel(500_000, "MEDIUM", 1),
@@ -166,8 +184,8 @@ class DatasetManager(BaseModel):
"""
data: BaseDataset
- test_data: pd.DataFrame | None = None
- gt_data: pd.DataFrame | None = None
+ test_data: list[list[float]] | None = None
+ gt_data: list[list[int]] | None = None
train_files: list[str] = []
reader: DatasetReader | None = None
@@ -191,7 +209,7 @@ def data_dir(self) -> pathlib.Path:
return pathlib.Path(
config.DATASET_LOCAL_DIR,
self.data.name.lower(),
- self.data.dir_name.lower(),
+ self.data.dir_name,
)
def __iter__(self):
@@ -215,29 +233,24 @@ def prepare(
bool: whether the dataset is successfully prepared
"""
- file_count, use_shuffled = self.data.file_count, self.data.use_shuffled
-
- train_files = utils.compose_train_files(file_count, use_shuffled)
- all_files = train_files
-
+ self.train_files = self.data.train_files
gt_file, test_file = None, None
if self.data.with_gt:
- gt_file, test_file = utils.compose_gt_file(filters), "test.parquet"
- all_files.extend([gt_file, test_file])
+ gt_file, test_file = utils.compose_gt_file(filters), self.data.test_file
- if not self.data.is_custom:
+ if self.data.with_remote_resource:
+ download_files = [file for file in self.train_files]
+ download_files.extend([gt_file, test_file])
source.reader().read(
dataset=self.data.dir_name.lower(),
- files=all_files,
+ files=download_files,
local_ds_root=self.data_dir,
)
if gt_file is not None and test_file is not None:
- self.test_data = self._read_file(test_file)
- self.gt_data = self._read_file(gt_file)
+ self.test_data = self._read_file(test_file)[self.data.test_vector_field].to_list()
+ self.gt_data = self._read_file(gt_file)[self.data.gt_neighbors_field].to_list()
- prefix = "shuffle_train" if use_shuffled else "train"
- self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")])
log.debug(f"{self.data.name}: available train files {self.train_files}")
return True
@@ -313,3 +326,47 @@ def get(self, size: int) -> BaseDataset:
def manager(self, size: int) -> DatasetManager:
return DatasetManager(data=self.get(size))
+
+
+class DatasetWithSizeType(Enum):
+ CohereSmall = "Small Cohere (768dim, 100K)"
+ CohereMedium = "Medium Cohere (768dim, 1M)"
+ CohereLarge = "Large Cohere (768dim, 10M)"
+ OpenAISmall = "Small OpenAI (1536dim, 50K)"
+ OpenAIMedium = "Medium OpenAI (1536dim, 500K)"
+ OpenAILarge = "Large OpenAI (1536dim, 5M)"
+
+ def get_manager(self) -> DatasetManager:
+ if self not in DatasetWithSizeMap:
+ msg = f"wrong ScalarDatasetWithSizeType: {self.name}"
+ raise ValueError(msg)
+ return DatasetWithSizeMap.get(self)
+
+ def get_load_timeout(self) -> float:
+ if "small" in self.value.lower():
+ return config.LOAD_TIMEOUT_768D_100K
+ if "medium" in self.value.lower():
+ return config.LOAD_TIMEOUT_768D_1M
+ if "large" in self.value.lower():
+ return config.LOAD_TIMEOUT_768D_10M
+ msg = f"No load_timeout for {self.value}"
+ raise KeyError(msg)
+
+ def get_optimize_timeout(self) -> float:
+ if "small" in self.value.lower():
+ return config.OPTIMIZE_TIMEOUT_768D_100K
+ if "medium" in self.value.lower():
+ return config.OPTIMIZE_TIMEOUT_768D_1M
+ if "large" in self.value.lower():
+ return config.OPTIMIZE_TIMEOUT_768D_10M
+ return config.OPTIMIZE_TIMEOUT_DEFAULT
+
+
+DatasetWithSizeMap = {
+ DatasetWithSizeType.CohereSmall: Dataset.COHERE.manager(100_000),
+ DatasetWithSizeType.CohereMedium: Dataset.COHERE.manager(1_000_000),
+ DatasetWithSizeType.CohereLarge: Dataset.COHERE.manager(10_000_000),
+ DatasetWithSizeType.OpenAISmall: Dataset.OPENAI.manager(50_000),
+ DatasetWithSizeType.OpenAIMedium: Dataset.OPENAI.manager(500_000),
+ DatasetWithSizeType.OpenAILarge: Dataset.OPENAI.manager(5_000_000),
+}
diff --git a/vectordb_bench/backend/runner/__init__.py b/vectordb_bench/backend/runner/__init__.py
index b83df6f9..ceb980a9 100644
--- a/vectordb_bench/backend/runner/__init__.py
+++ b/vectordb_bench/backend/runner/__init__.py
@@ -1,10 +1,10 @@
-from .mp_runner import (
- MultiProcessingSearchRunner,
-)
+from .mp_runner import MultiProcessingSearchRunner
+from .read_write_runner import ReadWriteRunner
from .serial_runner import SerialInsertRunner, SerialSearchRunner
__all__ = [
"MultiProcessingSearchRunner",
"SerialInsertRunner",
"SerialSearchRunner",
+ "ReadWriteRunner",
]
diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py
index 5b69b548..d6ed282b 100644
--- a/vectordb_bench/backend/runner/mp_runner.py
+++ b/vectordb_bench/backend/runner/mp_runner.py
@@ -67,12 +67,13 @@ def search(
self.k,
self.filters,
)
+ count += 1
+ latencies.append(time.perf_counter() - s)
except Exception as e:
log.warning(f"VectorDB search_embedding error: {e}")
- traceback.print_exc(chain=True)
- raise e from None
+ # traceback.print_exc(chain=True)
+ # raise e from None
- latencies.append(time.perf_counter() - s)
count += 1
# loop through the test data
idx = idx + 1 if idx < num - 1 else 0
@@ -180,10 +181,10 @@ def run(self) -> float:
def stop(self) -> None:
pass
- def run_by_dur(self, duration: int) -> float:
+ def run_by_dur(self, duration: int) -> tuple[float, float]:
return self._run_by_dur(duration)
- def _run_by_dur(self, duration: int) -> float:
+ def _run_by_dur(self, duration: int) -> tuple[float, float]:
max_qps = 0
try:
for conc in self.concurrencies:
@@ -211,14 +212,17 @@ def _run_by_dur(self, duration: int) -> float:
)
start = time.perf_counter()
- all_count = sum([r.result() for r in future_iter])
+ res = [r.result() for r in future_iter]
+ all_success_count = sum([r[0] for r in res])
+ all_failed_count = sum([r[1] for r in res])
+ failed_rate = all_failed_count / (all_failed_count + all_success_count)
cost = time.perf_counter() - start
- qps = round(all_count / cost, 4)
+ qps = round(all_success_count / cost, 4)
log.info(
- f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}",
+ f"End search in concurrency {conc}: dur={cost}s, failed_rate={failed_rate}"
+ f"all_success_count={all_success_count}, all_failed_count={all_failed_count}, qps={qps}",
)
-
if qps > max_qps:
max_qps = qps
log.info(
@@ -237,15 +241,9 @@ def _run_by_dur(self, duration: int) -> float:
finally:
self.stop()
- return max_qps
+ return max_qps, failed_rate
- def search_by_dur(
- self,
- dur: int,
- test_data: list[list[float]],
- q: mp.Queue,
- cond: mp.Condition,
- ) -> int:
+ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int]:
# sync all process
q.put(1)
with cond:
@@ -255,7 +253,8 @@ def search_by_dur(
num, idx = len(test_data), random.randint(0, len(test_data) - 1)
start_time = time.perf_counter()
- count = 0
+ success_count = 0
+ failed_cnt = 0
while time.perf_counter() < start_time + dur:
s = time.perf_counter()
try:
@@ -264,25 +263,27 @@ def search_by_dur(
self.k,
self.filters,
)
+ success_count += 1
except Exception as e:
+ failed_cnt += 1
log.warning(f"VectorDB search_embedding error: {e}")
- traceback.print_exc(chain=True)
- raise e from None
+ # traceback.print_exc(chain=True)
+ # raise e from None
- count += 1
# loop through the test data
idx = idx + 1 if idx < num - 1 else 0
- if count % 500 == 0:
+ if success_count % 500 == 0:
log.debug(
- f"({mp.current_process().name:16}) search_count: {count}, ",
+ f"({mp.current_process().name:16}) search_count: {success_count}, "
f"latest_latency={time.perf_counter()-s}",
)
total_dur = round(time.perf_counter() - start_time, 4)
log.debug(
f"{mp.current_process().name:16} search {self.duration}s: "
- f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}",
+ f"actual_dur={total_dur}s, count={success_count}, failed_cnt={failed_cnt}, "
+ f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
)
- return count
+ return success_count, failed_cnt
diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py
index 0145af4c..dc2b066b 100644
--- a/vectordb_bench/backend/runner/rate_runner.py
+++ b/vectordb_bench/backend/runner/rate_runner.py
@@ -5,7 +5,7 @@
from concurrent.futures import ThreadPoolExecutor
from vectordb_bench import config
-from vectordb_bench.backend.clients import api
+from vectordb_bench.backend.clients import DB, api
from vectordb_bench.backend.dataset import DataSetIterator
from vectordb_bench.backend.utils import time_it
@@ -13,6 +13,9 @@
log = logging.getLogger(__name__)
+Time_Per_Insert_Batch = 1 # 1s
+Max_Insert_Retry = 5
+
class RatedMultiThreadingInsertRunner:
def __init__(
@@ -30,78 +33,92 @@ def __init__(
self.insert_rate = rate
self.batch_rate = rate // config.NUM_PER_BATCH
- def send_insert_task(self, db: api.VectorDB, emb: list[list[float]], metadata: list[str]):
- db.insert_embeddings(emb, metadata)
+ self.executing_futures = []
+ self.sig_idx = 0
+
+ def send_insert_task(self, db: api.VectorDB, emb: list[list[float]], metadata: list[str], retry_idx: int = 0):
+ try:
+ db.insert_embeddings(emb, metadata)
+ except Exception as e:
+ log.warning(f"Insert Failed, try_idx={retry_idx}, Exception: {e}")
+ retry_idx += 1
+ if retry_idx <= Max_Insert_Retry:
+ time.sleep(retry_idx)
+ self.send_insert_task(db, emb=emb, metadata=metadata, retry_idx=retry_idx)
+ else:
+ msg = f"Insert failed and retried more than {Max_Insert_Retry} times"
+ raise RuntimeError(msg) from e
@time_it
def run_with_rate(self, q: mp.Queue):
with ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor:
- executing_futures = []
@time_it
def submit_by_rate() -> bool:
rate = self.batch_rate
for data in self.dataset:
emb, metadata = get_data(data, self.normalize)
- executing_futures.append(
- executor.submit(self.send_insert_task, self.db, emb, metadata),
- )
+ self.executing_futures.append(executor.submit(self.send_insert_task, self.db, emb, metadata))
rate -= 1
if rate == 0:
return False
return rate == self.batch_rate
+ def check_and_send_signal(wait_interval: float, finished: bool = False):
+ try:
+ done, not_done = concurrent.futures.wait(
+ self.executing_futures,
+ timeout=wait_interval,
+ return_when=concurrent.futures.FIRST_EXCEPTION,
+ )
+ _ = [fut.result() for fut in done]
+ if len(not_done) > 0:
+ log.warning(f"[{len(not_done)}] tasks are not done, trying to wait in the next round")
+ self.executing_futures = list(not_done)
+ else:
+ self.executing_futures = []
+
+ self.sig_idx += len(done)
+ while self.sig_idx >= self.batch_rate:
+ self.sig_idx -= self.batch_rate
+ if self.sig_idx < self.batch_rate and len(not_done) == 0 and finished:
+ q.put(True, block=True)
+ else:
+ q.put(False, block=False)
+
+ except Exception as e:
+ log.warning(f"task error, terminating, err={e}")
+ q.put(None, block=True)
+ executor.shutdown(wait=True, cancel_futures=True)
+ raise e
+
with self.db.init():
+ start_time = time.perf_counter()
+ inserted_batch_cnt = 0
+
while True:
- start_time = time.perf_counter()
finished, elapsed_time = submit_by_rate()
if finished is True:
- q.put(True, block=True)
- log.info(f"End of dataset, left unfinished={len(executing_futures)}")
+ log.info(f"End of dataset, left unfinished={len(self.executing_futures)}")
break
-
- q.put(False, block=False)
- wait_interval = 1 - elapsed_time if elapsed_time < 1 else 0.001
-
- try:
- done, not_done = concurrent.futures.wait(
- executing_futures,
- timeout=wait_interval,
- return_when=concurrent.futures.FIRST_EXCEPTION,
+ if elapsed_time >= 0.9:
+ log.warning(
+ f"Submit insert tasks took {elapsed_time}s, expected 1s, "
+ f"indicating potential resource limitations on the client machine.",
)
+ wait_interval = (
+ Time_Per_Insert_Batch - elapsed_time if elapsed_time < Time_Per_Insert_Batch else 0.001
+ )
- if len(not_done) > 0:
- log.warning(
- f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] ",
- f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round",
- )
- executing_futures = list(not_done)
- else:
- log.debug(
- f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} ",
- f"task in 1s, wait_interval={wait_interval:.2f}",
- )
- executing_futures = []
- except Exception as e:
- log.warning(f"task error, terminating, err={e}")
- q.put(None, block=True)
- executor.shutdown(wait=True, cancel_futures=True)
- raise e from e
-
- dur = time.perf_counter() - start_time
- if dur < 1:
- time.sleep(1 - dur)
+ check_and_send_signal(wait_interval, finished=False)
+
+ dur = time.perf_counter() - start_time - inserted_batch_cnt * Time_Per_Insert_Batch
+ if dur < Time_Per_Insert_Batch:
+ time.sleep(Time_Per_Insert_Batch - dur)
+ inserted_batch_cnt += 1
# wait for all tasks in executing_futures to complete
- if len(executing_futures) > 0:
- try:
- done, _ = concurrent.futures.wait(
- executing_futures,
- return_when=concurrent.futures.FIRST_EXCEPTION,
- )
- except Exception as e:
- log.warning(f"task error, terminating, err={e}")
- q.put(None, block=True)
- executor.shutdown(wait=True, cancel_futures=True)
- raise e from e
+ wait_interval = 1
+ while len(self.executing_futures) > 0:
+ check_and_send_signal(wait_interval, finished=True)
diff --git a/vectordb_bench/backend/runner/read_write_runner.py b/vectordb_bench/backend/runner/read_write_runner.py
index e916f45d..4efb3dd4 100644
--- a/vectordb_bench/backend/runner/read_write_runner.py
+++ b/vectordb_bench/backend/runner/read_write_runner.py
@@ -1,13 +1,17 @@
import concurrent
+import concurrent.futures
import logging
import math
import multiprocessing as mp
+import time
from collections.abc import Iterable
import numpy as np
from vectordb_bench.backend.clients import api
from vectordb_bench.backend.dataset import DatasetManager
+from vectordb_bench.backend.utils import time_it
+from vectordb_bench.metric import Metric
from .mp_runner import MultiProcessingSearchRunner
from .rate_runner import RatedMultiThreadingInsertRunner
@@ -26,33 +30,37 @@ def __init__(
k: int = 100,
filters: dict | None = None,
concurrencies: Iterable[int] = (1, 15, 50),
- search_stage: Iterable[float] = (
+ search_stages: Iterable[float] = (
0.5,
0.6,
0.7,
0.8,
0.9,
), # search from insert portion, 0.0 means search from the start
+ optimize_after_write: bool = True,
read_dur_after_write: int = 300, # seconds, search duration when insertion is done
timeout: float | None = None,
):
self.insert_rate = insert_rate
self.data_volume = dataset.data.size
- for stage in search_stage:
+ for stage in search_stages:
assert 0.0 <= stage < 1.0, "each search stage should be in [0.0, 1.0)"
- self.search_stage = sorted(search_stage)
+ self.search_stages = sorted(search_stages)
+ self.optimize_after_write = optimize_after_write
self.read_dur_after_write = read_dur_after_write
log.info(
- f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, ",
+ f"Init runner, concurencys={concurrencies}, search_stages={self.search_stages}, "
f"stage_search_dur={read_dur_after_write}",
)
- test_emb = np.stack(dataset.test_data["emb"])
if normalize:
+ test_emb = np.array(dataset.test_data)
test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis]
- test_emb = test_emb.tolist()
+ test_emb = test_emb.tolist()
+ else:
+ test_emb = dataset.test_data
MultiProcessingSearchRunner.__init__(
self,
@@ -76,6 +84,7 @@ def __init__(
k=k,
)
+ @time_it
def run_optimize(self):
"""Optimize needs to run in differenct process for pymilvus schema recursion problem"""
with self.db.init():
@@ -85,49 +94,90 @@ def run_optimize(self):
def run_search(self):
log.info("Search after write - Serial search start")
+ perc = 100
+ test_time = round(time.perf_counter(), 4)
res, ssearch_dur = self.serial_search_runner.run()
recall, ndcg, p99_latency = res
log.info(
- f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, ",
- f"dur={ssearch_dur:.4f}",
+ f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, "
+ f"p99={p99_latency}, dur={ssearch_dur:.4f}",
)
log.info(
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
)
- max_qps = self.run_by_dur(self.read_dur_after_write)
+ max_qps, conc_failed_rate = self.run_by_dur(self.read_dur_after_write)
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")
- return (max_qps, recall, ndcg, p99_latency)
+ return [(perc, test_time, max_qps, recall, ndcg, p99_latency, conc_failed_rate)]
- def run_read_write(self):
- with mp.Manager() as m:
- q = m.Queue()
- with concurrent.futures.ProcessPoolExecutor(
- mp_context=mp.get_context("spawn"),
- max_workers=2,
- ) as executor:
- read_write_futures = []
- read_write_futures.append(executor.submit(self.run_with_rate, q))
- read_write_futures.append(executor.submit(self.run_search_by_sig, q))
+ def run_read_write(self) -> Metric:
+ m = Metric()
+ with mp.Manager() as mp_manager:
+ q = mp_manager.Queue()
+ with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context("spawn"), max_workers=2) as executor:
+ insert_future = executor.submit(self.run_with_rate, q)
+ search_future = executor.submit(self.run_search_by_sig, q)
try:
- for f in concurrent.futures.as_completed(read_write_futures):
- res = f.result()
- log.info(f"Result = {res}")
+ start_time = time.perf_counter()
+ _, m.insert_duration = insert_future.result()
+ search_res = search_future.result()
# Wait for read_write_futures finishing and do optimize and search
- op_future = executor.submit(self.run_optimize)
- op_future.result()
+ if self.optimize_after_write:
+ op_future = executor.submit(self.run_optimize)
+ _, m.optimize_duration = op_future.result()
+ log.info(f"Optimize cost {m.optimize_duration}s")
+ else:
+ log.info("Skip optimization")
+
+ if self.read_dur_after_write > 0:
+ final_search_future = executor.submit(self.run_search)
+ final_search_res = final_search_future.result()
+ else:
+ log.info("Skip search after write")
+ final_search_res = []
+
+ if search_res is not None and final_search_res is not None:
+ r = [*search_res, *final_search_res]
+ m.st_search_stage_list = [d[0] for d in r]
+ m.st_search_time_list = [round(d[1] - start_time, 4) for d in r]
+ m.st_max_qps_list_list = [d[2] for d in r]
+ m.st_recall_list = [d[3] for d in r]
+ m.st_ndcg_list = [d[4] for d in r]
+ m.st_serial_latency_p99_list = [d[5] for d in r]
+ m.st_conc_failed_rate_list = [d[6] for d in r]
- search_future = executor.submit(self.run_search)
- last_res = search_future.result()
-
- log.info(f"Max QPS after optimze and search: {last_res}")
except Exception as e:
log.warning(f"Read and write error: {e}")
executor.shutdown(wait=True, cancel_futures=True)
- raise e from e
- log.info("Concurrent read write all done")
+ # raise e
+ m.st_ideal_insert_duration = math.ceil(self.data_volume / self.insert_rate)
+ log.info(f"Concurrent read write all done, results: {m}")
+ return m
+
+ def get_each_conc_search_dur(self, ssearch_dur: float, cur_stage: float, next_stage: float) -> float:
+ # Search duration for non-last search stage is carefully calculated.
+ # If duration for each concurrency is less than 30s, runner will raise error.
+ total_dur_between_stages = self.data_volume * (next_stage - cur_stage) // self.insert_rate
+ csearch_dur = total_dur_between_stages - ssearch_dur
+
+ # Try to leave room for init process executors
+ if csearch_dur > 60:
+ csearch_dur -= 30
+ elif csearch_dur > 30:
+ csearch_dur -= 15
+ else:
+ csearch_dur /= 2
+
+ each_conc_search_dur = csearch_dur / len(self.concurrencies)
+ if each_conc_search_dur < 30:
+ warning_msg = (
+ f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short,"
+ f" total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}."
+ )
+ log.warning(warning_msg)
+ return each_conc_search_dur
def run_search_by_sig(self, q: mp.Queue):
"""
@@ -151,7 +201,7 @@ def wait_next_target(start: int, target_batch: int) -> bool:
start += 1
return True
- for idx, stage in enumerate(self.search_stage):
+ for idx, stage in enumerate(self.search_stages):
target_batch = int(total_batch * stage)
perc = int(stage * 100)
@@ -163,41 +213,29 @@ def wait_next_target(start: int, target_batch: int) -> bool:
return None
log.info(f"Insert {perc}% done, total batch={total_batch}")
- log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
- res, ssearch_dur = self.serial_search_runner.run()
- recall, ndcg, p99_latency = res
- log.info(
- f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ",
- f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}",
- )
-
- # Search duration for non-last search stage is carefully calculated.
- # If duration for each concurrency is less than 30s, runner will raise error.
- if idx < len(self.search_stage) - 1:
- total_dur_between_stages = self.data_volume * (self.search_stage[idx + 1] - stage) // self.insert_rate
- csearch_dur = total_dur_between_stages - ssearch_dur
-
- # Try to leave room for init process executors
- csearch_dur = csearch_dur - 30 if csearch_dur > 60 else csearch_dur
-
- each_conc_search_dur = csearch_dur / len(self.concurrencies)
- if each_conc_search_dur < 30:
- warning_msg = (
- f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, ",
- f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}.",
- )
- log.warning(warning_msg)
-
- # The last stage
- else:
- each_conc_search_dur = 60
-
- log.info(
- f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}",
- )
- max_qps = self.run_by_dur(each_conc_search_dur)
- result.append((perc, max_qps, recall, ndcg, p99_latency))
+ test_time = round(time.perf_counter(), 4)
+ max_qps, recall, ndcg, p99_latency, conc_failed_rate = 0, 0, 0, 0, 0
+ try:
+ log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
+ res, ssearch_dur = self.serial_search_runner.run()
+ recall, ndcg, p99_latency = res
+ log.info(
+ f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}"
+ )
+ each_conc_search_dur = self.get_each_conc_search_dur(
+ ssearch_dur,
+ cur_stage=stage,
+ next_stage=self.search_stages[idx + 1] if idx < len(self.search_stages) - 1 else 1.0,
+ )
+ log.info(
+ f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}"
+ )
+ if each_conc_search_dur > 0:
+ max_qps, conc_failed_rate = self.run_by_dur(each_conc_search_dur)
+ except Exception as e:
+ log.warning(f"Streaming Search Failed at stage={stage}. Exception: {e}")
+ result.append((perc, test_time, max_qps, recall, ndcg, p99_latency, conc_failed_rate))
start_batch = target_batch
# Drain the queue
diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py
index 7eb59432..edd65e88 100644
--- a/vectordb_bench/backend/runner/serial_runner.py
+++ b/vectordb_bench/backend/runner/serial_runner.py
@@ -6,7 +6,6 @@
import traceback
import numpy as np
-import pandas as pd
import psutil
from vectordb_bench.backend.dataset import DatasetManager
@@ -189,12 +188,15 @@ def run(self) -> int:
return count
+Max_Search_Retry = 5
+
+
class SerialSearchRunner:
def __init__(
self,
db: api.VectorDB,
test_data: list[list[float]],
- ground_truth: pd.DataFrame,
+ ground_truth: list[list[int]],
k: int = 100,
filters: dict | None = None,
):
@@ -208,35 +210,44 @@ def __init__(
self.test_data = test_data
self.ground_truth = ground_truth
- def search(self, args: tuple[list, pd.DataFrame]) -> tuple[float, float, float]:
- log.info(
- f"{mp.current_process().name:14} start search the entire test_data to get recall and latency",
- )
+ def _get_db_search_res(self, emb, retry_idx: int = 0) -> list[int]:
+ try:
+ results = self.db.search_embedding(
+ emb,
+ self.k,
+ self.filters,
+ )
+ except Exception as e:
+ log.warning(f"Serial search failed, retry_idx={retry_idx}, Exception: {e}")
+ if retry_idx < Max_Search_Retry:
+ return self._get_db_search_res(emb=emb, retry_idx=retry_idx + 1)
+ else:
+ raise RuntimeError(f"Serial search failed and retried more than {Max_Search_Retry} times") from e
+
+ return results
+
+ def search(self, args: tuple[list, list[list[int]]]) -> tuple[float, float, float]:
+ log.info(f"{mp.current_process().name:14} start search the entire test_data to get recall and latency")
with self.db.init():
test_data, ground_truth = args
ideal_dcg = get_ideal_dcg(self.k)
log.debug(f"test dataset size: {len(test_data)}")
- log.debug(f"ground truth size: {ground_truth.columns}, shape: {ground_truth.shape}")
+ log.debug(f"ground truth size: {len(ground_truth)}")
latencies, recalls, ndcgs = [], [], []
for idx, emb in enumerate(test_data):
s = time.perf_counter()
try:
- results = self.db.search_embedding(
- emb,
- self.k,
- self.filters,
- )
-
+ results = self._get_db_search_res(emb)
except Exception as e:
log.warning(f"VectorDB search_embedding error: {e}")
- traceback.print_exc(chain=True)
+ # traceback.print_exc(chain=True)
raise e from None
latencies.append(time.perf_counter() - s)
- gt = ground_truth["neighbors_id"][idx]
+ gt = ground_truth[idx]
recalls.append(calc_recall(self.k, gt[: self.k], results))
ndcgs.append(calc_ndcg(gt[: self.k], results, ideal_dcg))
diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py
index e24d74f0..9d009284 100644
--- a/vectordb_bench/backend/task_runner.py
+++ b/vectordb_bench/backend/task_runner.py
@@ -6,15 +6,14 @@
import numpy as np
import psutil
-from vectordb_bench.base import BaseModel
-from vectordb_bench.metric import Metric
-from vectordb_bench.models import PerformanceTimeoutError, TaskConfig, TaskStage
-
+from ..base import BaseModel
+from ..metric import Metric
+from ..models import PerformanceTimeoutError, TaskConfig, TaskStage
from . import utils
-from .cases import Case, CaseLabel
+from .cases import Case, CaseLabel, StreamingPerformanceCase
from .clients import MetricType, api
from .data_source import DatasetSource
-from .runner import MultiProcessingSearchRunner, SerialInsertRunner, SerialSearchRunner
+from .runner import MultiProcessingSearchRunner, ReadWriteRunner, SerialInsertRunner, SerialSearchRunner
log = logging.getLogger(__name__)
@@ -48,6 +47,7 @@ class CaseRunner(BaseModel):
serial_search_runner: SerialSearchRunner | None = None
search_runner: MultiProcessingSearchRunner | None = None
final_search_runner: MultiProcessingSearchRunner | None = None
+ read_write_runner: ReadWriteRunner | None = None
def __eq__(self, obj: any):
if isinstance(obj, CaseRunner):
@@ -63,6 +63,7 @@ def display(self) -> dict:
c_dict = self.ca.dict(
include={
"label": True,
+ "name": True,
"filters": True,
"dataset": {
"data": {
@@ -112,6 +113,8 @@ def run(self, drop_old: bool = True) -> Metric:
return self._run_capacity_case()
if self.ca.label == CaseLabel.Performance:
return self._run_perf_case(drop_old)
+ if self.ca.label == CaseLabel.Streaming:
+ return self._run_streaming_case()
msg = f"unknown case type: {self.ca.label}"
log.warning(msg)
raise ValueError(msg)
@@ -192,10 +195,6 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric:
) = search_results
if TaskStage.SEARCH_SERIAL in self.config.stages:
search_results = self._serial_search()
- """
- m.recall = search_results.recall
- m.serial_latencies = search_results.serial_latencies
- """
m.recall, m.ndcg, m.serial_latency_p99 = search_results
except Exception as e:
@@ -206,6 +205,19 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric:
log.info(f"Performance case got result: {m}")
return m
+ def _run_streaming_case(self) -> Metric:
+ log.info("Start streaming case")
+ try:
+ self._init_read_write_runner()
+ m = self.read_write_runner.run_read_write()
+ except Exception as e:
+ log.warning(f"Failed to run streaming case, reason = {e}")
+ traceback.print_exc()
+ raise e from None
+ else:
+ log.info(f"Streaming case got result: {m}")
+ return m
+
@utils.time_it
def _load_train_data(self):
"""Insert train data and get the insert_duration"""
@@ -222,7 +234,7 @@ def _load_train_data(self):
finally:
runner = None
- def _serial_search(self) -> tuple[float, float, float]:
+ def _serial_search(self) -> tuple[float, float]:
"""Performance serial tests, search the entire test data once,
calculate the recall, serial_latency_p99
@@ -273,10 +285,12 @@ def _optimize(self) -> float:
raise e from None
def _init_search_runner(self):
- test_emb = np.stack(self.ca.dataset.test_data["emb"])
if self.normalize:
+ test_emb = np.stack(self.ca.dataset.test_data)
test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis]
- self.test_emb = test_emb.tolist()
+ self.test_emb = test_emb.tolist()
+ else:
+ self.test_emb = self.ca.dataset.test_data
gt_df = self.ca.dataset.gt_data
@@ -298,6 +312,19 @@ def _init_search_runner(self):
k=self.config.case_config.k,
)
+ def _init_read_write_runner(self):
+ ca: StreamingPerformanceCase = self.ca
+ self.read_write_runner = ReadWriteRunner(
+ db=self.db,
+ dataset=ca.dataset,
+ insert_rate=ca.insert_rate,
+ search_stages=ca.search_stages,
+ optimize_after_write=ca.optimize_after_write,
+ read_dur_after_write=ca.read_dur_after_write,
+ concurrencies=ca.concurrencies,
+ k=self.config.case_config.k,
+ )
+
def stop(self):
if self.search_runner:
self.search_runner.stop()
diff --git a/vectordb_bench/frontend/components/check_results/data.py b/vectordb_bench/frontend/components/check_results/data.py
index 94d1b4ea..57f42c28 100644
--- a/vectordb_bench/frontend/components/check_results/data.py
+++ b/vectordb_bench/frontend/components/check_results/data.py
@@ -23,7 +23,7 @@ def getFilterTasks(
task
for task in tasks
if task.task_config.db_name in dbNames
- and task.task_config.case_config.case_id.case_cls(task.task_config.case_config.custom_case).name in caseNames
+ and task.task_config.case_config.case_name in caseNames
]
return filterTasks
@@ -35,17 +35,22 @@ def mergeTasks(tasks: list[CaseResult]):
db = task.task_config.db.value
db_label = task.task_config.db_config.db_label or ""
version = task.task_config.db_config.version or ""
- case = task.task_config.case_config.case_id.case_cls(task.task_config.case_config.custom_case)
+ case = task.task_config.case_config.case
+ case_name = case.name
+ dataset_name = case.dataset.data.full_name
+ filter_rate = case.filter_rate
dbCaseMetricsMap[db_name][case.name] = {
"db": db,
"db_label": db_label,
"version": version,
+ "dataset_name": dataset_name,
+ "filter_rate": filter_rate,
"metrics": mergeMetrics(
- dbCaseMetricsMap[db_name][case.name].get("metrics", {}),
+ dbCaseMetricsMap[db_name][case_name].get("metrics", {}),
asdict(task.metrics),
),
"label": getBetterLabel(
- dbCaseMetricsMap[db_name][case.name].get("label", ResultLabel.FAILED),
+ dbCaseMetricsMap[db_name][case_name].get("label", ResultLabel.FAILED),
task.label,
),
}
@@ -59,12 +64,16 @@ def mergeTasks(tasks: list[CaseResult]):
db_label = metricInfo["db_label"]
version = metricInfo["version"]
label = metricInfo["label"]
+ dataset_name = metricInfo["dataset_name"]
+ filter_rate = metricInfo["filter_rate"]
if label == ResultLabel.NORMAL:
mergedTasks.append(
{
"db_name": db_name,
"db": db,
"db_label": db_label,
+ "dataset_name": dataset_name,
+ "filter_rate": filter_rate,
"version": version,
"case_name": case_name,
"metricsSet": set(metrics.keys()),
diff --git a/vectordb_bench/frontend/components/check_results/filters.py b/vectordb_bench/frontend/components/check_results/filters.py
index 129c1d5a..445fd9d4 100644
--- a/vectordb_bench/frontend/components/check_results/filters.py
+++ b/vectordb_bench/frontend/components/check_results/filters.py
@@ -1,14 +1,17 @@
from vectordb_bench.backend.cases import Case
from vectordb_bench.frontend.components.check_results.data import getChartData
-from vectordb_bench.frontend.components.check_results.expanderStyle import initSidebarExanderStyle
+from vectordb_bench.frontend.components.check_results.expanderStyle import (
+ initSidebarExanderStyle,
+)
from vectordb_bench.frontend.config.dbCaseConfigs import CASE_NAME_ORDER
-from vectordb_bench.frontend.config.styles import *
+from vectordb_bench.frontend.config.styles import SIDEBAR_CONTROL_COLUMNS
import streamlit as st
+from typing import Callable
from vectordb_bench.models import CaseResult, TestResult
-def getshownData(results: list[TestResult], st):
+def getshownData(st, results: list[TestResult], **kwargs):
# hide the nav
st.markdown(
"",
@@ -17,15 +20,20 @@ def getshownData(results: list[TestResult], st):
st.header("Filters")
- shownResults = getshownResults(results, st)
- showDBNames, showCaseNames = getShowDbsAndCases(shownResults, st)
+ shownResults = getshownResults(st, results, **kwargs)
+ showDBNames, showCaseNames = getShowDbsAndCases(st, shownResults)
shownData, failedTasks = getChartData(shownResults, showDBNames, showCaseNames)
return shownData, failedTasks, showCaseNames
-def getshownResults(results: list[TestResult], st) -> list[CaseResult]:
+def getshownResults(
+ st,
+ results: list[TestResult],
+ case_results_filter: Callable[[CaseResult], bool] = lambda x: True,
+ **kwargs,
+) -> list[CaseResult]:
resultSelectOptions = [
result.task_label if result.task_label != result.run_id else f"res-{result.run_id[:4]}" for result in results
]
@@ -41,23 +49,17 @@ def getshownResults(results: list[TestResult], st) -> list[CaseResult]:
)
selectedResult: list[CaseResult] = []
for option in selectedResultSelectedOptions:
- result = results[resultSelectOptions.index(option)].results
- selectedResult += result
+ case_results = results[resultSelectOptions.index(option)].results
+ selectedResult += [r for r in case_results if case_results_filter(r)]
return selectedResult
-def getShowDbsAndCases(result: list[CaseResult], st) -> tuple[list[str], list[str]]:
+def getShowDbsAndCases(st, result: list[CaseResult]) -> tuple[list[str], list[str]]:
initSidebarExanderStyle(st)
allDbNames = list(set({res.task_config.db_name for res in result}))
allDbNames.sort()
- allCases: list[Case] = [
- res.task_config.case_config.case_id.case_cls(res.task_config.case_config.custom_case) for res in result
- ]
- allCaseNameSet = set({case.name for case in allCases})
- allCaseNames = [case_name for case_name in CASE_NAME_ORDER if case_name in allCaseNameSet] + [
- case_name for case_name in allCaseNameSet if case_name not in CASE_NAME_ORDER
- ]
+ allCases: list[Case] = [res.task_config.case_config.case for res in result]
# DB Filter
dbFilterContainer = st.container()
@@ -67,6 +69,12 @@ def getShowDbsAndCases(result: list[CaseResult], st) -> tuple[list[str], list[st
allDbNames,
col=1,
)
+ showCaseNames = []
+
+ allCaseNameSet = set({case.name for case in allCases})
+ allCaseNames = [
+ case_name for case_name in CASE_NAME_ORDER if case_name in allCaseNameSet
+ ] + [case_name for case_name in allCaseNameSet if case_name not in CASE_NAME_ORDER]
# Case Filter
caseFilterContainer = st.container()
diff --git a/vectordb_bench/frontend/components/check_results/headerIcon.py b/vectordb_bench/frontend/components/check_results/headerIcon.py
index 4715701f..cd18251e 100644
--- a/vectordb_bench/frontend/components/check_results/headerIcon.py
+++ b/vectordb_bench/frontend/components/check_results/headerIcon.py
@@ -8,8 +8,8 @@ def drawHeaderIcon(st):