Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hf_transfer to testing image #29

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/codespell.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: codespell

on:
# Trigger the workflow on push or pull request,
# but only for the main branch
push:
branches:
- main
paths:
- "**/*.py"
- "**/*.md"
- "**/*.rst"
- pyproject.toml
- requirements-lint.txt
- .github/workflows/codespell.yml
pull_request:
branches:
- main
paths:
- "**/*.py"
- "**/*.md"
- "**/*.rst"
- pyproject.toml
- requirements-lint.txt
- .github/workflows/codespell.yml

jobs:
codespell:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.12"]
steps:
- uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # v4.2.1
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-lint.txt
- name: Spelling check with codespell
run: |
codespell --toml pyproject.toml
2 changes: 1 addition & 1 deletion .github/workflows/mypy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ jobs:
- name: Mypy
run: |
echo "::add-matcher::.github/workflows/matchers/mypy.json"
tools/mypy.sh 1
tools/mypy.sh 1 ${{ matrix.python-version }}
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ ADD . /vllm-workspace/
RUN --mount=type=cache,target=/root/.cache/pip \
python3 -m pip install -r requirements-dev.txt

# enable fast downloads from hf (for testing)
RUN --mount=type=cache,target=/root/.cache/pip \
python3 -m pip install hf_transfer
ENV HF_HUB_ENABLE_HF_TRANSFER 1

# Copy in the v1 package for testing (it isn't distributed yet)
COPY vllm/v1 /usr/local/lib/python3.12/dist-packages/vllm/v1

Expand Down
57 changes: 52 additions & 5 deletions benchmarks/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,43 @@ def sample_random_requests(
async def get_request(
input_requests: List[Tuple[str, int, int]],
request_rate: float,
burstiness: float = 1.0,
) -> AsyncGenerator[Tuple[str, int, int], None]:
"""
Asynchronously generates requests at a specified rate
with OPTIONAL burstiness.

Args:
input_requests:
A list of input requests, each represented as a tuple.
request_rate:
The rate at which requests are generated (requests/s).
burstiness (optional):
The burstiness factor of the request generation.
Only takes effect when request_rate is not inf.
Default value is 1, which follows a Poisson process.
Otherwise, the request intervals follow a gamma distribution.
A lower burstiness value (0 < burstiness < 1) results
in more bursty requests, while a higher burstiness value
(burstiness > 1) results in a more uniform arrival of requests.
"""
input_requests = iter(input_requests)

# Calculate scale parameter theta to maintain the desired request_rate.
assert burstiness > 0, (
f"A positive burstiness factor is expected, but given {burstiness}.")
theta = 1.0 / (request_rate * burstiness)

for request in input_requests:
yield request

if request_rate == float("inf"):
# If the request rate is infinity, then we don't need to wait.
continue

# Sample the request interval from the exponential distribution.
interval = np.random.exponential(1.0 / request_rate)
# Sample the request interval from the gamma distribution.
# If burstiness is 1, it follows exponential distribution.
interval = np.random.gamma(shape=burstiness, scale=theta)
# The next request will be sent after the interval.
await asyncio.sleep(interval)

Expand Down Expand Up @@ -426,6 +452,7 @@ async def benchmark(
logprobs: Optional[int],
best_of: int,
request_rate: float,
burstiness: float,
disable_tqdm: bool,
profile: bool,
selected_percentile_metrics: List[str],
Expand Down Expand Up @@ -480,7 +507,13 @@ async def benchmark(
if profile_output.success:
print("Profiler started")

if burstiness == 1.0:
distribution = "Poisson process"
else:
distribution = "Gamma distribution"

print(f"Traffic request rate: {request_rate}")
print(f"Burstiness factor: {burstiness} ({distribution})")
print(f"Maximum request concurrency: {max_concurrency}")

pbar = None if disable_tqdm else tqdm(total=len(input_requests))
Expand All @@ -502,7 +535,7 @@ async def limited_request_func(request_func_input, pbar):

benchmark_start_time = time.perf_counter()
tasks: List[asyncio.Task] = []
async for request in get_request(input_requests, request_rate):
async for request in get_request(input_requests, request_rate, burstiness):
prompt, prompt_len, output_len, mm_content = request
request_func_input = RequestFuncInput(model=model_id,
prompt=prompt,
Expand Down Expand Up @@ -769,6 +802,7 @@ def main(args: argparse.Namespace):
logprobs=args.logprobs,
best_of=args.best_of,
request_rate=args.request_rate,
burstiness=args.burstiness,
disable_tqdm=args.disable_tqdm,
profile=args.profile,
selected_percentile_metrics=args.percentile_metrics.split(","),
Expand Down Expand Up @@ -807,6 +841,7 @@ def main(args: argparse.Namespace):
# Traffic
result_json["request_rate"] = (
args.request_rate if args.request_rate < float("inf") else "inf")
result_json["burstiness"] = args.burstiness
result_json["max_concurrency"] = args.max_concurrency

# Merge with benchmark result
Expand Down Expand Up @@ -922,8 +957,20 @@ def main(args: argparse.Namespace):
default=float("inf"),
help="Number of requests per second. If this is inf, "
"then all the requests are sent at time 0. "
"Otherwise, we use Poisson process to synthesize "
"the request arrival times.",
"Otherwise, we use Poisson process or gamma distribution "
"to synthesize the request arrival times.",
)
parser.add_argument(
"--burstiness",
type=float,
default=1.0,
help="Burstiness factor of the request generation. "
"Only take effect when request_rate is not inf. "
"Default value is 1, which follows Poisson process. "
"Otherwise, the request intervals follow a gamma distribution. "
"A lower burstiness value (0 < burstiness < 1) results in more "
"bursty requests. A higher burstiness value (burstiness > 1) "
"results in a more uniform arrival of requests.",
)
parser.add_argument("--seed", type=int, default=0)
parser.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion cmake/cpu_extension.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ if (AVX512_FOUND AND NOT AVX512_DISABLED)
FetchContent_Declare(
oneDNN
GIT_REPOSITORY https://github.com/oneapi-src/oneDNN.git
GIT_TAG v3.5.3
GIT_TAG v3.6
GIT_PROGRESS TRUE
GIT_SHALLOW TRUE
)
Expand Down
10 changes: 10 additions & 0 deletions csrc/cpu/cpu_types_x86.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,16 @@ struct FP32Vec16 : public Vec<FP32Vec16> {
explicit FP32Vec16(const FP32Vec8 &data)
: reg_low(data.reg), reg_high(data.reg) {}

explicit FP32Vec16(const FP16Vec16 &v) {
__m128i low = _mm256_extractf128_si256(v.reg, 0);
__m128i high = _mm256_extractf128_si256(v.reg, 1);

reg_low = _mm256_cvtph_ps(low);
reg_high = _mm256_cvtph_ps(high);
}

explicit FP32Vec16(const FP16Vec8 &v) : FP32Vec16(FP32Vec8(v)) {}

explicit FP32Vec16(const BF16Vec16 &v) {
__m128i low = _mm256_extractf128_si256(v.reg, 0);
__m128i high = _mm256_extractf128_si256(v.reg, 1);
Expand Down
119 changes: 61 additions & 58 deletions csrc/custom_all_reduce.cu
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,29 @@

#include "custom_all_reduce.cuh"

// fake pointer type, must match fptr_t type in ops.h
// Fake pointer type, must match fptr_t type in ops.h.
// We use this type alias to indicate when pointers are passed in as int64_t.
using fptr_t = int64_t;
static_assert(sizeof(void*) == sizeof(fptr_t));

fptr_t init_custom_ar(torch::Tensor& meta, torch::Tensor& rank_data,
const std::vector<std::string>& handles,
const std::vector<int64_t>& offsets, int64_t rank,
fptr_t init_custom_ar(const std::vector<fptr_t>& fake_ipc_ptrs,
torch::Tensor& rank_data, int64_t rank,
bool full_nvlink) {
int world_size = offsets.size();
int world_size = fake_ipc_ptrs.size();
if (world_size > 8)
throw std::invalid_argument("world size > 8 is not supported");
if (world_size % 2 != 0)
throw std::invalid_argument("Odd num gpus is not supported for now");
if (world_size != handles.size())
throw std::invalid_argument(
"handles length should equal to offsets length");
if (rank < 0 || rank >= world_size)
throw std::invalid_argument("invalid rank passed in");

cudaIpcMemHandle_t ipc_handles[8];
vllm::Signal* ipc_ptrs[8];
for (int i = 0; i < world_size; i++) {
std::memcpy(&ipc_handles[i], handles[i].data(), sizeof(cudaIpcMemHandle_t));
ipc_ptrs[i] = reinterpret_cast<vllm::Signal*>(fake_ipc_ptrs[i]);
}
return (fptr_t) new vllm::CustomAllreduce(
reinterpret_cast<vllm::Signal*>(meta.data_ptr()), rank_data.data_ptr(),
rank_data.numel(), ipc_handles, offsets, rank, full_nvlink);
return (fptr_t) new vllm::CustomAllreduce(ipc_ptrs, rank_data.data_ptr(),
rank_data.numel(), rank, world_size,
full_nvlink);
}

/**
Expand All @@ -55,26 +52,48 @@ bool _is_weak_contiguous(torch::Tensor& t) {
t.numel() * t.element_size());
}

void _all_reduce(fptr_t _fa, torch::Tensor& inp, torch::Tensor& out,
cudaStream_t stream) {
/**
* Performs an out-of-place allreduce and stores result in out.
*
* If _reg_buffer is null, assumes inp.data_ptr() is already IPC-registered.
* Otherwise, _reg_buffer is assumed to be IPC-registered and inp is first
* copied into _reg_buffer.
*/
void all_reduce(fptr_t _fa, torch::Tensor& inp, torch::Tensor& out,
fptr_t _reg_buffer, int64_t reg_buffer_sz_bytes) {
auto fa = reinterpret_cast<vllm::CustomAllreduce*>(_fa);
const at::cuda::OptionalCUDAGuard device_guard(device_of(inp));
auto stream = c10::cuda::getCurrentCUDAStream().stream();

TORCH_CHECK_EQ(inp.scalar_type(), out.scalar_type());
TORCH_CHECK_EQ(inp.numel(), out.numel());
TORCH_CHECK(_is_weak_contiguous(out));
TORCH_CHECK(_is_weak_contiguous(inp));
auto input_size = inp.numel() * inp.element_size();
auto reg_buffer = reinterpret_cast<void*>(_reg_buffer);
if (reg_buffer) {
TORCH_CHECK_LE(input_size, reg_buffer_sz_bytes);
AT_CUDA_CHECK(cudaMemcpyAsync(reg_buffer, inp.data_ptr(), input_size,
cudaMemcpyDeviceToDevice, stream));
} else {
reg_buffer = inp.data_ptr();
}
switch (out.scalar_type()) {
case at::ScalarType::Float: {
fa->allreduce<float>(stream, reinterpret_cast<float*>(inp.data_ptr()),
fa->allreduce<float>(stream, reinterpret_cast<float*>(reg_buffer),
reinterpret_cast<float*>(out.data_ptr()),
out.numel());
break;
}
case at::ScalarType::Half: {
fa->allreduce<half>(stream, reinterpret_cast<half*>(inp.data_ptr()),
fa->allreduce<half>(stream, reinterpret_cast<half*>(reg_buffer),
reinterpret_cast<half*>(out.data_ptr()), out.numel());
break;
}
#if (__CUDA_ARCH__ >= 800 || !defined(__CUDA_ARCH__))
case at::ScalarType::BFloat16: {
fa->allreduce<nv_bfloat16>(
stream, reinterpret_cast<nv_bfloat16*>(inp.data_ptr()),
stream, reinterpret_cast<nv_bfloat16*>(reg_buffer),
reinterpret_cast<nv_bfloat16*>(out.data_ptr()), out.numel());
break;
}
Expand All @@ -85,57 +104,41 @@ void _all_reduce(fptr_t _fa, torch::Tensor& inp, torch::Tensor& out,
}
}

void all_reduce_reg(fptr_t _fa, torch::Tensor& inp, torch::Tensor& out) {
const at::cuda::OptionalCUDAGuard device_guard(device_of(inp));
auto stream = c10::cuda::getCurrentCUDAStream().stream();
TORCH_CHECK_EQ(inp.scalar_type(), out.scalar_type());
TORCH_CHECK_EQ(inp.numel(), out.numel());
_all_reduce(_fa, inp, out, stream);
}

void all_reduce_unreg(fptr_t _fa, torch::Tensor& inp, torch::Tensor& reg_buffer,
torch::Tensor& out) {
const at::cuda::OptionalCUDAGuard device_guard(device_of(inp));
auto stream = c10::cuda::getCurrentCUDAStream().stream();

auto input_size = inp.numel() * inp.element_size();
TORCH_CHECK_EQ(inp.scalar_type(), out.scalar_type());
TORCH_CHECK_EQ(inp.numel(), out.numel());
TORCH_CHECK(input_size <= reg_buffer.numel() * reg_buffer.element_size(),
"registered buffer is too small to contain the input");
AT_CUDA_CHECK(cudaMemcpyAsync(reg_buffer.data_ptr(), inp.data_ptr(),
input_size, cudaMemcpyDeviceToDevice, stream));
_all_reduce(_fa, reg_buffer, out, stream);
}

void dispose(fptr_t _fa) {
auto fa = reinterpret_cast<vllm::CustomAllreduce*>(_fa);
delete fa;
delete reinterpret_cast<vllm::CustomAllreduce*>(_fa);
}

int64_t meta_size() { return sizeof(vllm::Signal); }

void register_buffer(fptr_t _fa, torch::Tensor& t,
const std::vector<std::string>& handles,
const std::vector<int64_t>& offsets) {
void register_buffer(fptr_t _fa, const std::vector<fptr_t>& fake_ipc_ptrs) {
auto fa = reinterpret_cast<vllm::CustomAllreduce*>(_fa);
fa->register_buffer(handles, offsets, t.data_ptr());
TORCH_CHECK(fake_ipc_ptrs.size() == fa->world_size_);
void* ipc_ptrs[8];
for (int i = 0; i < fake_ipc_ptrs.size(); i++) {
ipc_ptrs[i] = reinterpret_cast<void*>(fake_ipc_ptrs[i]);
}
fa->register_buffer(ipc_ptrs);
}

std::tuple<torch::Tensor, std::vector<int64_t>> get_graph_buffer_ipc_meta(
fptr_t _fa) {
// Use vector<int64_t> to represent byte data for python binding compatibility.
std::tuple<std::vector<int64_t>, std::vector<int64_t>>
get_graph_buffer_ipc_meta(fptr_t _fa) {
auto fa = reinterpret_cast<vllm::CustomAllreduce*>(_fa);
auto [handle_bytes, offsets] = fa->get_graph_buffer_ipc_meta();
auto options =
torch::TensorOptions().dtype(torch::kUInt8).device(torch::kCPU);
auto handles =
torch::empty({static_cast<int64_t>(handle_bytes.size())}, options);
std::memcpy(handles.data_ptr(), handle_bytes.data(), handle_bytes.size());
return {handles, std::move(offsets)};
auto [handle, offsets] = fa->get_graph_buffer_ipc_meta();
std::vector<int64_t> bytes(handle.begin(), handle.end());
return std::make_tuple(bytes, offsets);
}

void register_graph_buffers(fptr_t _fa, const std::vector<std::string>& handles,
// Use vector<int64_t> to represent byte data for python binding compatibility.
void register_graph_buffers(fptr_t _fa,
const std::vector<std::vector<int64_t>>& handles,
const std::vector<std::vector<int64_t>>& offsets) {
auto fa = reinterpret_cast<vllm::CustomAllreduce*>(_fa);
fa->register_graph_buffers(handles, offsets);
std::vector<std::string> bytes;
bytes.reserve(handles.size());
for (int i = 0; i < handles.size(); i++) {
bytes.emplace_back(handles[i].begin(), handles[i].end());
}
bytes.reserve(handles.size());
fa->register_graph_buffers(bytes, offsets);
}
Loading