Skip to content

Commit

Permalink
..
Browse files Browse the repository at this point in the history
Signed-off-by: min.tian <min.tian.cn@gmail.com>
  • Loading branch information
alwayslove2013 committed Aug 5, 2024
1 parent 5ef1daf commit 22a1d86
Show file tree
Hide file tree
Showing 26 changed files with 4,376 additions and 299 deletions.
20 changes: 11 additions & 9 deletions vectordb_bench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ class config:
LOG_LEVEL = env.str("LOG_LEVEL", "INFO")

DEFAULT_DATASET_URL = env.str("DEFAULT_DATASET_URL", AWS_S3_URL)
DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset")
# DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset")
DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/home/nas/milvus/tianmin")
NUM_PER_BATCH = env.int("NUM_PER_BATCH", 5000)

DROP_OLD = env.bool("DROP_OLD", True)
USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", True)
USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", False)

NUM_CONCURRENCY = env.list("NUM_CONCURRENCY", [1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100], subcast=int )
NUM_CONCURRENCY = env.list("NUM_CONCURRENCY", [10], subcast=int)

CONCURRENCY_DURATION = 30

Expand All @@ -46,14 +47,15 @@ class config:
LOAD_TIMEOUT_1536D_500K = 2.5 * 3600 # 2.5h
LOAD_TIMEOUT_1536D_5M = 25 * 3600 # 25h

OPTIMIZE_TIMEOUT_DEFAULT = 30 * 60 # 30min
OPTIMIZE_TIMEOUT_768D_1M = 30 * 60 # 30min
OPTIMIZE_TIMEOUT_768D_10M = 5 * 3600 # 5h
OPTIMIZE_TIMEOUT_768D_100M = 50 * 3600 # 50h
OPTIMIZE_TIMEOUT_DEFAULT = 30 * 600 # 30min
OPTIMIZE_TIMEOUT_768D_1M = 30 * 600 # 30min
OPTIMIZE_TIMEOUT_768D_10M = 5 * 36000 # 5h
OPTIMIZE_TIMEOUT_768D_100M = 50 * 36000 # 50h


OPTIMIZE_TIMEOUT_1536D_500K = 15 * 60 # 15min
OPTIMIZE_TIMEOUT_1536D_5M = 2.5 * 3600 # 2.5h
OPTIMIZE_TIMEOUT_1536D_500K = 15 * 600 # 15min
OPTIMIZE_TIMEOUT_1536D_5M = 2.5 * 36000 # 2.5h

def display(self) -> str:
tmp = [
i for i in inspect.getmembers(self)
Expand Down
10 changes: 4 additions & 6 deletions vectordb_bench/backend/assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .task_runner import CaseRunner, RunningStatus, TaskRunner
from ..models import TaskConfig
from ..backend.clients import EmptyDBCaseConfig
from ..backend.data_source import DatasetSource
from ..backend.data_source import DatasetSource
import logging


Expand All @@ -11,10 +11,8 @@

class Assembler:
@classmethod
def assemble(cls, run_id , task: TaskConfig, source: DatasetSource) -> CaseRunner:
c_cls = task.case_config.case_id.case_cls

c = c_cls(task.case_config.custom_case)
def assemble(cls, run_id, task: TaskConfig, source: DatasetSource) -> CaseRunner:
c = task.case_config.case
if type(task.db_case_config) != EmptyDBCaseConfig:
task.db_case_config.metric_type = c.dataset.data.metric_type

Expand Down Expand Up @@ -55,7 +53,7 @@ def assemble_all(

# sort by dataset size
for k in db2runner.keys():
db2runner[k].sort(key=lambda x:x.ca.dataset.data.size)
db2runner[k].sort(key=lambda x: x.ca.dataset.data.size)

all_runners = []
all_runners.extend(load_runners)
Expand Down
113 changes: 90 additions & 23 deletions vectordb_bench/backend/cases.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import typing
import logging
from enum import Enum, auto
from typing import Type

from vectordb_bench import config
from vectordb_bench.backend.clients.api import MetricType
from vectordb_bench.backend.filters import (
Filter,
FilterType,
IntFilter,
LabelFilter,
NonFilter,
)
from vectordb_bench.base import BaseModel
from vectordb_bench.frontend.components.custom.getCustomConfig import (
CustomDatasetConfig,
)

from .dataset import CustomDataset, Dataset, DatasetManager
from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,6 +54,7 @@ class CaseType(Enum):

Custom = 100
PerformanceCustomDataset = 101
LabelFilterPerformanceCase = 102

def case_cls(self, custom_configs: dict | None = None) -> Type["Case"]:
if custom_configs is None:
Expand Down Expand Up @@ -82,46 +88,73 @@ class Case(BaseModel):
label(CaseLabel): performance or load.
dataset(DataSet): dataset for this case runner.
filter_rate(float | None): one of 99% | 1% | None
filters(dict | None): filters for search
filters(Filter): NonFilter / IntFilter / LabelFilter
with_scalar_labels(bool): whether to insert scalar data (labels)
"""

case_id: CaseType
label: CaseLabel
name: str
description: str
dataset: DatasetManager
with_scalar_labels: bool = False

load_timeout: float | int
optimize_timeout: float | int | None = None

filter_rate: float | None = None
filters: Filter = NonFilter()

@property
def filters(self) -> dict | None:
if self.filter_rate is not None:
ID = round(self.filter_rate * self.dataset.data.size)
return {
"metadata": f">={ID}",
"id": ID,
}
def check_scalar_labels(self) -> None:
if self.with_scalar_labels and not self.dataset.data.with_scalar_labels:
raise ValueError(
f"""no scalar_labels data in current dataset ({self.dataset.data.name})"""
)
if self.filters.type == FilterType.Label and not self.with_scalar_labels:
raise ValueError("label-filter cases need scalar_labels data")

def __init__(self, **kwargs):
super().__init__(**kwargs)

# post check
self.check_scalar_labels()

# @property
# def filters(self) -> dict | None:
# if self.filter_rate is not None:
# ID = round(self.filter_rate * self.dataset.data.size)
# return {
# "metadata": f">={ID}",
# "id": ID,
# }

return None
# return None


class CapacityCase(Case, BaseModel):
class CapacityCase(Case):
label: CaseLabel = CaseLabel.Load
filter_rate: float | None = None
load_timeout: float | int = config.CAPACITY_TIMEOUT_IN_SECONDS
optimize_timeout: float | int | None = None


class PerformanceCase(Case, BaseModel):
class PerformanceCase(Case):
label: CaseLabel = CaseLabel.Performance
filter_rate: float | None = None
load_timeout: float | int = config.LOAD_TIMEOUT_DEFAULT
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_DEFAULT


class IntFilterPerformanceCase(PerformanceCase):
@property
def filters(self) -> Filter:
int_field = "id"
int_value = self.dataset.data.size * self.filter_rate
return IntFilter(
filter_rate=self.filter_rate, int_field=int_field, int_value=int_value
)


class CapacityDim960(CapacityCase):
case_id: CaseType = CaseType.CapacityDim960
dataset: DatasetManager = Dataset.GIST.manager(100_000)
Expand Down Expand Up @@ -160,7 +193,7 @@ class Performance768D1M(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_1M


class Performance768D10M1P(PerformanceCase):
class Performance768D10M1P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance768D10M1P
filter_rate: float | int | None = 0.01
dataset: DatasetManager = Dataset.COHERE.manager(10_000_000)
Expand All @@ -171,7 +204,7 @@ class Performance768D10M1P(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_10M


class Performance768D1M1P(PerformanceCase):
class Performance768D1M1P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance768D1M1P
filter_rate: float | int | None = 0.01
dataset: DatasetManager = Dataset.COHERE.manager(1_000_000)
Expand All @@ -182,7 +215,7 @@ class Performance768D1M1P(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_1M


class Performance768D10M99P(PerformanceCase):
class Performance768D10M99P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance768D10M99P
filter_rate: float | int | None = 0.99
dataset: DatasetManager = Dataset.COHERE.manager(10_000_000)
Expand All @@ -193,7 +226,7 @@ class Performance768D10M99P(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_10M


class Performance768D1M99P(PerformanceCase):
class Performance768D1M99P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance768D1M99P
filter_rate: float | int | None = 0.99
dataset: DatasetManager = Dataset.COHERE.manager(1_000_000)
Expand Down Expand Up @@ -237,7 +270,7 @@ class Performance1536D5M(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_5M


class Performance1536D500K1P(PerformanceCase):
class Performance1536D500K1P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance1536D500K1P
filter_rate: float | int | None = 0.01
dataset: DatasetManager = Dataset.OPENAI.manager(500_000)
Expand All @@ -248,7 +281,7 @@ class Performance1536D500K1P(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_500K


class Performance1536D5M1P(PerformanceCase):
class Performance1536D5M1P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance1536D5M1P
filter_rate: float | int | None = 0.01
dataset: DatasetManager = Dataset.OPENAI.manager(5_000_000)
Expand All @@ -259,7 +292,7 @@ class Performance1536D5M1P(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_5M


class Performance1536D500K99P(PerformanceCase):
class Performance1536D500K99P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance1536D500K99P
filter_rate: float | int | None = 0.99
dataset: DatasetManager = Dataset.OPENAI.manager(500_000)
Expand All @@ -270,7 +303,7 @@ class Performance1536D500K99P(PerformanceCase):
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_1536D_500K


class Performance1536D5M99P(PerformanceCase):
class Performance1536D5M99P(IntFilterPerformanceCase):
case_id: CaseType = CaseType.Performance1536D5M99P
filter_rate: float | int | None = 0.99
dataset: DatasetManager = Dataset.OPENAI.manager(5_000_000)
Expand Down Expand Up @@ -304,6 +337,39 @@ def metric_type_map(s: str) -> MetricType:
raise RuntimeError(err_msg)


class LabelFilterPerformanceCase(PerformanceCase):
case_id: CaseType = CaseType.LabelFilterPerformanceCase
with_scalar_labels: bool = True

def __init__(
self,
dataset_with_size_type: DatasetWithSizeType,
label_percentage: float,
**kwargs,
):
if not isinstance(dataset_with_size_type, DatasetWithSizeType):
dataset_with_size_type = DatasetWithSizeType(dataset_with_size_type)
name = (
f"Label-Filter-{label_percentage*100:.1f}% - {dataset_with_size_type.value}"
)
description = f"Label-Filter-{label_percentage*100:.1f}% Performance Test ({dataset_with_size_type.value})"
dataset = dataset_with_size_type.get_manager()
load_timeout = dataset_with_size_type.get_load_timeout()
optimize_timeout = dataset_with_size_type.get_optimize_timeout()
filter = LabelFilter(label_percentage=label_percentage)
filter_rate = filter.filter_rate
super().__init__(
name=name,
description=description,
dataset=dataset,
load_timeout=load_timeout,
optimize_timeout=optimize_timeout,
filter_rate=filter_rate,
filters=filter,
**kwargs,
)


class PerformanceCustomDataset(PerformanceCase):
case_id: CaseType = CaseType.PerformanceCustomDataset
name: str = "Performance With Custom Dataset"
Expand Down Expand Up @@ -357,4 +423,5 @@ def __init__(
CaseType.Performance1536D5M99P: Performance1536D5M99P,
CaseType.Performance1536D50K: Performance1536D50K,
CaseType.PerformanceCustomDataset: PerformanceCustomDataset,
CaseType.LabelFilterPerformanceCase: LabelFilterPerformanceCase,
}
12 changes: 6 additions & 6 deletions vectordb_bench/backend/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from pydantic import BaseModel, validator, SecretStr

from vectordb_bench.backend.filters import Filter


class MetricType(str, Enum):
L2 = "L2"
Expand Down Expand Up @@ -167,13 +169,11 @@ def insert_embeddings(
"""
raise NotImplementedError

def prepare_filters(self, filters: Filter):
pass

@abstractmethod
def search_embedding(
self,
query: list[float],
k: int = 100,
filters: dict | None = None,
) -> list[int]:
def search_embedding(self, query: list[float], k: int = 100, **kwargs) -> list[int]:
"""Get k most similar embeddings to query vector.
Args:
Expand Down
15 changes: 10 additions & 5 deletions vectordb_bench/backend/clients/milvus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class MilvusConfig(DBConfig):
uri: SecretStr = "http://localhost:19530"
uri: SecretStr = "http://10.104.22.89:19530"

def to_dict(self) -> dict:
return {"uri": self.uri.get_secret_value()}
Expand All @@ -14,10 +14,14 @@ class MilvusIndexConfig(BaseModel):

index: IndexType
metric_type: MetricType | None = None

@property
def is_gpu_index(self) -> bool:
return self.index in [IndexType.GPU_CAGRA, IndexType.GPU_IVF_FLAT, IndexType.GPU_IVF_PQ]
return self.index in [
IndexType.GPU_CAGRA,
IndexType.GPU_IVF_FLAT,
IndexType.GPU_IVF_PQ,
]

def parse_metric(self) -> str:
if not self.metric_type:
Expand Down Expand Up @@ -99,7 +103,8 @@ def search_param(self) -> dict:
"metric_type": self.parse_metric(),
"params": {"nprobe": self.nprobe},
}



class IVFSQ8Config(MilvusIndexConfig, DBCaseConfig):
nlist: int
nprobe: int | None = None
Expand Down Expand Up @@ -196,7 +201,7 @@ class GPUCAGRAConfig(MilvusIndexConfig, DBCaseConfig):
search_width: int = 4
min_iterations: int = 0
max_iterations: int = 0
build_algo: str = "IVF_PQ" # IVF_PQ; NN_DESCENT;
build_algo: str = "IVF_PQ" # IVF_PQ; NN_DESCENT;
cache_dataset_on_device: str
refine_ratio: float | None = None
index: IndexType = IndexType.GPU_CAGRA
Expand Down
Loading

0 comments on commit 22a1d86

Please sign in to comment.