Skip to content

Commit

Permalink
new metric support: ndcg@100, performance under conc_test
Browse files Browse the repository at this point in the history
Signed-off-by: min.tian <min.tian.cn@gmail.com>
  • Loading branch information
alwayslove2013 authored and XuanYang-cn committed Jun 28, 2024
1 parent 5265f2f commit 09306a0
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 18 deletions.
17 changes: 14 additions & 3 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import multiprocessing as mp
import logging
from typing import Iterable
import numpy as np
from ..clients import api
from ... import config

Expand Down Expand Up @@ -49,6 +50,7 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition)

start_time = time.perf_counter()
count = 0
latencies = []
while time.perf_counter() < start_time + self.duration:
s = time.perf_counter()
try:
Expand All @@ -61,7 +63,8 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition)
log.warning(f"VectorDB search_embedding error: {e}")
traceback.print_exc(chain=True)
raise e from None


latencies.append(time.perf_counter() - s)
count += 1
# loop through the test data
idx = idx + 1 if idx < num - 1 else 0
Expand All @@ -75,7 +78,7 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition)
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}"
)

return (count, total_dur)
return (count, total_dur, latencies)

@staticmethod
def get_mp_context():
Expand All @@ -85,6 +88,9 @@ def get_mp_context():

def _run_all_concurrencies_mem_efficient(self) -> float:
max_qps = 0
conc_num_list = []
conc_qps_list = []
conc_latency_p99_list = []
try:
for conc in self.concurrencies:
with mp.Manager() as m:
Expand All @@ -103,9 +109,14 @@ def _run_all_concurrencies_mem_efficient(self) -> float:

start = time.perf_counter()
all_count = sum([r.result()[0] for r in future_iter])
latencies = sum([r.result()[2] for r in future_iter], start=[])
latency_p99 = np.percentile(latencies, 0.99)
cost = time.perf_counter() - start

qps = round(all_count / cost, 4)
conc_num_list.append(conc)
conc_qps_list.append(qps)
conc_latency_p99_list.append(latency_p99)
log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")

if qps > max_qps:
Expand All @@ -122,7 +133,7 @@ def _run_all_concurrencies_mem_efficient(self) -> float:
finally:
self.stop()

return max_qps
return max_qps, conc_num_list, conc_qps_list, conc_latency_p99_list

def run(self) -> float:
"""
Expand Down
10 changes: 7 additions & 3 deletions vectordb_bench/backend/runner/serial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pandas as pd

from ..clients import api
from ...metric import calc_recall
from ...metric import calc_ndcg, calc_recall, get_ideal_dcg
from ...models import LoadTimeoutError, PerformanceTimeoutError
from .. import utils
from ... import config
Expand Down Expand Up @@ -171,11 +171,12 @@ def search(self, args: tuple[list, pd.DataFrame]):
log.info(f"{mp.current_process().name:14} start search the entire test_data to get recall and latency")
with self.db.init():
test_data, ground_truth = args
ideal_dcg = get_ideal_dcg(self.k)

log.debug(f"test dataset size: {len(test_data)}")
log.debug(f"ground truth size: {ground_truth.columns}, shape: {ground_truth.shape}")

latencies, recalls = [], []
latencies, recalls, ndcgs = [], [], []
for idx, emb in enumerate(test_data):
s = time.perf_counter()
try:
Expand All @@ -194,24 +195,27 @@ def search(self, args: tuple[list, pd.DataFrame]):

gt = ground_truth['neighbors_id'][idx]
recalls.append(calc_recall(self.k, gt[:self.k], results))
ndcgs.append(calc_ndcg(gt[:self.k], results, ideal_dcg))


if len(latencies) % 100 == 0:
log.debug(f"({mp.current_process().name:14}) search_count={len(latencies):3}, latest_latency={latencies[-1]}, latest recall={recalls[-1]}")

avg_latency = round(np.mean(latencies), 4)
avg_recall = round(np.mean(recalls), 4)
avg_ndcg = round(np.mean(ndcgs), 4)
cost = round(np.sum(latencies), 4)
p99 = round(np.percentile(latencies, 99), 4)
log.info(
f"{mp.current_process().name:14} search entire test_data: "
f"cost={cost}s, "
f"queries={len(latencies)}, "
f"avg_recall={avg_recall}, "
f"avg_ndcg={avg_ndcg},"
f"avg_latency={avg_latency}, "
f"p99={p99}"
)
return (avg_recall, p99)
return (avg_recall, avg_ndcg, p99)


def _run_in_subprocess(self) -> tuple[float, float]:
Expand Down
8 changes: 5 additions & 3 deletions vectordb_bench/backend/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric:
)
self._init_search_runner()
m.qps = self._conc_search()
m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = self._conc_search()
m.recall, m.serial_latency_p99 = self._serial_search()
'''

Expand Down Expand Up @@ -181,10 +182,11 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric:
m.recall = search_results.recall
m.serial_latencies = search_results.serial_latencies
'''
m.recall, m.serial_latency_p99 = search_results
m.recall, m.ndcg, m.serial_latency_p99 = search_results
if TaskStage.SEARCH_CONCURRENT in self.config.stages:
search_results = self._conc_search()
m.qps = search_results
m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = search_results

except Exception as e:
log.warning(f"Failed to run performance case, reason = {e}")
traceback.print_exc()
Expand Down
19 changes: 11 additions & 8 deletions vectordb_bench/frontend/components/check_results/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,18 @@ def mergeMetrics(metrics_1: dict, metrics_2: dict) -> dict:


def getBetterMetric(metric, value_1, value_2):
if value_1 < 1e-7:
return value_2
if value_2 < 1e-7:
try:
if value_1 < 1e-7:
return value_2
if value_2 < 1e-7:
return value_1
return (
min(value_1, value_2)
if isLowerIsBetterMetric(metric)
else max(value_1, value_2)
)
except Exception:
return value_1
return (
min(value_1, value_2)
if isLowerIsBetterMetric(metric)
else max(value_1, value_2)
)


def getBetterLabel(label_1: ResultLabel, label_2: ResultLabel):
Expand Down
82 changes: 82 additions & 0 deletions vectordb_bench/frontend/components/concurrent/charts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@


from vectordb_bench.backend.cases import Case
from vectordb_bench.frontend.components.check_results.expanderStyle import initMainExpanderStyle
import plotly.express as px

from vectordb_bench.frontend.const.styles import COLOR_MAP


def drawChartsByCase(allData, cases: list[Case], st):
initMainExpanderStyle(st)
for case in cases:
chartContainer = st.expander(case.name, True)
caseDataList = [
data for data in allData if data["case_name"] == case.name]
data = [{
"conc_num": caseData["conc_num_list"][i],
"qps": caseData["conc_qps_list"][i],
"latency_p99": caseData["conc_latency_p99_list"][i] * 1000,
"db_name": caseData["db_name"],
"db": caseData["db"]

} for caseData in caseDataList for i in range(len(caseData["conc_num_list"]))]
drawChart(data, chartContainer)


def getRange(metric, data, padding_multipliers):
minV = min([d.get(metric, 0) for d in data])
maxV = max([d.get(metric, 0) for d in data])
padding = maxV - minV
rangeV = [
minV - padding * padding_multipliers[0],
maxV + padding * padding_multipliers[1],
]
return rangeV


def drawChart(data, st):
if len(data) == 0:
return

x = "latency_p99"
xrange = getRange(x, data, [0.05, 0.1])

y = "qps"
yrange = getRange(y, data, [0.2, 0.1])

color = "db"
color_discrete_map = COLOR_MAP
color = "db_name"
color_discrete_map = None
line_group = "db_name"
text = "conc_num"

data.sort(key=lambda a: a["conc_num"])

fig = px.line(
data,
x=x,
y=y,
color=color,
color_discrete_map=color_discrete_map,
line_group=line_group,
text=text,
markers=True,
# color_discrete_map=color_discrete_map,
hover_data={
"conc_num": True,
},
height=720,
)
fig.update_xaxes(range=xrange, title_text="Latency P99 (ms)")
fig.update_yaxes(range=yrange, title_text="QPS")
fig.update_traces(textposition="bottom right",
texttemplate="conc-%{text:,.4~r}")
# fig.update_layout(
# margin=dict(l=0, r=0, t=40, b=0, pad=8),
# legend=dict(
# orientation="h", yanchor="bottom", y=1, xanchor="right", x=1, title=""
# ),
# )
st.plotly_chart(fig, use_container_width=True,)
44 changes: 44 additions & 0 deletions vectordb_bench/frontend/components/tables/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dataclasses import asdict
from vectordb_bench.backend.cases import CaseType
from vectordb_bench.interface import benchMarkRunner
from vectordb_bench.models import CaseResult, ResultLabel
import pandas as pd


def getNewResults():
allResults = benchMarkRunner.get_results()
newResults: list[CaseResult] = []

for res in allResults:
results = res.results
for result in results:
if result.label == ResultLabel.NORMAL:
newResults.append(result)


df = pd.DataFrame(formatData(newResults))
return df


def formatData(caseResults: list[CaseResult]):
data = []
for caseResult in caseResults:
db = caseResult.task_config.db.value
db_label = caseResult.task_config.db_config.db_label
case_config = caseResult.task_config.case_config
db_case_config = caseResult.task_config.db_case_config
case = case_config.case_id.case_cls()
filter_rate = case.filter_rate
dataset = case.dataset.data.name
metrics = asdict(caseResult.metrics)
data.append(
{
"db": db,
"db_label": db_label,
"case_name": case.name,
"dataset": dataset,
"filter_rate": filter_rate,
**metrics,
}
)
return data
72 changes: 72 additions & 0 deletions vectordb_bench/frontend/pages/concurrent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@



import streamlit as st
from vectordb_bench.backend.cases import CaseType
from vectordb_bench.frontend.components.check_results.footer import footer
from vectordb_bench.frontend.components.check_results.expanderStyle import initMainExpanderStyle
from vectordb_bench.frontend.components.check_results.priceTable import priceTable
from vectordb_bench.frontend.components.check_results.headerIcon import drawHeaderIcon
from vectordb_bench.frontend.components.check_results.nav import NavToResults, NavToRunTest
from vectordb_bench.frontend.components.check_results.charts import drawMetricChart
from vectordb_bench.frontend.components.check_results.filters import getshownData
from vectordb_bench.frontend.components.concurrent.charts import drawChartsByCase
from vectordb_bench.frontend.components.get_results.saveAsImage import getResults
from vectordb_bench.frontend.const.styles import *
from vectordb_bench.interface import benchMarkRunner
from vectordb_bench.models import TestResult


def main():
# set page config
st.set_page_config(
page_title="VDBBench Conc Perf",
page_icon=FAVICON,
layout="wide",
# initial_sidebar_state="collapsed",
)

# header
drawHeaderIcon(st)

allResults = benchMarkRunner.get_results()

def check_conc_data(res: TestResult):
case_results = res.results
count = 0
for case_result in case_results:
if len(case_result.metrics.conc_num_list) > 0:
count += 1

return count > 0

checkedResults = [res for res in allResults if check_conc_data(res)]


st.title("VectorDB Benchmark (Concurrent Performance)")

# results selector
resultSelectorContainer = st.sidebar.container()
shownData, _, showCases = getshownData(
checkedResults, resultSelectorContainer)


resultSelectorContainer.divider()

# nav
navContainer = st.sidebar.container()
NavToRunTest(navContainer)
NavToResults(navContainer)

# save or share
resultesContainer = st.sidebar.container()
getResults(resultesContainer, "vectordb_bench_concurrent")

drawChartsByCase(shownData, showCases, st.container())

# footer
footer(st.container())


if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions vectordb_bench/frontend/pages/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import streamlit as st
from vectordb_bench.frontend.components.check_results.headerIcon import drawHeaderIcon
from vectordb_bench.frontend.components.tables.data import getNewResults
from vectordb_bench.frontend.const.styles import FAVICON


def main():
# set page config
st.set_page_config(
page_title="Table",
page_icon=FAVICON,
layout="wide",
# initial_sidebar_state="collapsed",
)

# header
drawHeaderIcon(st)

df = getNewResults()
st.dataframe(df, height=800)


if __name__ == "__main__":
main()
Loading

0 comments on commit 09306a0

Please sign in to comment.