From 1b62745b1d00153c5e99879edaf0c2d7ceb4e2c6 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Sat, 7 Dec 2024 09:33:45 -0800 Subject: [PATCH] [core][executor] simplify instance id (#10976) Signed-off-by: youkaichao --- vllm/config.py | 7 ++++++- vllm/envs.py | 6 ------ vllm/executor/cpu_executor.py | 6 +----- vllm/executor/multiproc_gpu_executor.py | 5 +---- vllm/executor/ray_gpu_executor.py | 7 +------ vllm/executor/ray_hpu_executor.py | 7 +------ vllm/executor/ray_tpu_executor.py | 6 +----- vllm/executor/ray_xpu_executor.py | 6 +----- vllm/utils.py | 25 +++++++++---------------- vllm/worker/worker_base.py | 2 +- 10 files changed, 22 insertions(+), 55 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index db7046ab2c22d..d1c4f995ad015 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -27,7 +27,8 @@ get_hf_text_config, get_pooling_config, get_sentence_transformer_tokenizer_config, is_encoder_decoder, uses_mrope) from vllm.utils import (GiB_bytes, cuda_device_count_stateless, get_cpu_memory, - print_warning_once, resolve_obj_by_qualname) + print_warning_once, random_uuid, + resolve_obj_by_qualname) if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup @@ -2408,6 +2409,7 @@ class VllmConfig: init=True) # type: ignore kv_transfer_config: KVTransferConfig = field(default=None, init=True) # type: ignore + instance_id: str = "" @staticmethod def get_graph_batch_size(batch_size: int) -> int: @@ -2573,6 +2575,9 @@ def __post_init__(self): current_platform.check_and_update_config(self) + if not self.instance_id: + self.instance_id = random_uuid()[:5] + def __str__(self): return ("model=%r, speculative_config=%r, tokenizer=%r, " "skip_tokenizer_init=%s, tokenizer_mode=%s, revision=%s, " diff --git a/vllm/envs.py b/vllm/envs.py index 28797ac1e4af2..ab12a7b48dc53 100644 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -8,7 +8,6 @@ VLLM_RPC_BASE_PATH: str = tempfile.gettempdir() VLLM_USE_MODELSCOPE: bool = False VLLM_RINGBUFFER_WARNING_INTERVAL: int = 60 - VLLM_INSTANCE_ID: Optional[str] = None VLLM_NCCL_SO_PATH: Optional[str] = None LD_LIBRARY_PATH: Optional[str] = None VLLM_USE_TRITON_FLASH_ATTN: bool = False @@ -175,11 +174,6 @@ def get_default_config_root(): "VLLM_USE_MODELSCOPE": lambda: os.environ.get("VLLM_USE_MODELSCOPE", "False").lower() == "true", - # Instance id represents an instance of the VLLM. All processes in the same - # instance should have the same instance id. - "VLLM_INSTANCE_ID": - lambda: os.environ.get("VLLM_INSTANCE_ID", None), - # Interval in seconds to log a warning message when the ring buffer is full "VLLM_RINGBUFFER_WARNING_INTERVAL": lambda: int(os.environ.get("VLLM_RINGBUFFER_WARNING_INTERVAL", "60")), diff --git a/vllm/executor/cpu_executor.py b/vllm/executor/cpu_executor.py index 6b4cb5a9a1d61..2816b5c5c1f88 100644 --- a/vllm/executor/cpu_executor.py +++ b/vllm/executor/cpu_executor.py @@ -10,8 +10,7 @@ from vllm.model_executor.layers.sampler import SamplerOutput from vllm.prompt_adapter.request import PromptAdapterRequest from vllm.sequence import ExecuteModelRequest -from vllm.utils import (get_distributed_init_method, get_open_port, - get_vllm_instance_id, make_async) +from vllm.utils import get_distributed_init_method, get_open_port, make_async from vllm.worker.worker_base import WorkerWrapperBase logger = init_logger(__name__) @@ -31,9 +30,6 @@ def _init_executor(self) -> None: # Environment variables for CPU executor # - # Ensure that VLLM_INSTANCE_ID is set, to be inherited by workers - os.environ["VLLM_INSTANCE_ID"] = get_vllm_instance_id() - # Disable torch async compiling which won't work with daemonic processes os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1" diff --git a/vllm/executor/multiproc_gpu_executor.py b/vllm/executor/multiproc_gpu_executor.py index a6c05a71d2b6f..c450209f0eb91 100644 --- a/vllm/executor/multiproc_gpu_executor.py +++ b/vllm/executor/multiproc_gpu_executor.py @@ -16,7 +16,7 @@ from vllm.triton_utils.importing import HAS_TRITON from vllm.utils import (_run_task_with_lock, cuda_device_count_stateless, cuda_is_initialized, get_distributed_init_method, - get_open_port, get_vllm_instance_id, make_async, + get_open_port, make_async, update_environment_variables) if HAS_TRITON: @@ -37,9 +37,6 @@ def _init_executor(self) -> None: world_size = self.parallel_config.world_size tensor_parallel_size = self.parallel_config.tensor_parallel_size - # Ensure that VLLM_INSTANCE_ID is set, to be inherited by workers - os.environ["VLLM_INSTANCE_ID"] = get_vllm_instance_id() - # Disable torch async compiling which won't work with daemonic processes os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1" diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 6542b18ae70b1..6554cda6b637b 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -15,8 +15,7 @@ from vllm.model_executor.layers.sampler import SamplerOutput from vllm.sequence import ExecuteModelRequest from vllm.utils import (_run_task_with_lock, get_distributed_init_method, - get_ip, get_open_port, get_vllm_instance_id, - make_async) + get_ip, get_open_port, make_async) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -220,14 +219,10 @@ def sort_by_driver_then_worker_ip(worker): " environment variable, make sure it is unique for" " each node.") - VLLM_INSTANCE_ID = get_vllm_instance_id() - # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({ "CUDA_VISIBLE_DEVICES": ",".join(map(str, node_gpus[node_id])), - "VLLM_INSTANCE_ID": - VLLM_INSTANCE_ID, "VLLM_TRACE_FUNCTION": str(envs.VLLM_TRACE_FUNCTION), **({ diff --git a/vllm/executor/ray_hpu_executor.py b/vllm/executor/ray_hpu_executor.py index a74328e5aa272..91c84d9214a60 100644 --- a/vllm/executor/ray_hpu_executor.py +++ b/vllm/executor/ray_hpu_executor.py @@ -15,8 +15,7 @@ from vllm.model_executor.layers.sampler import SamplerOutput from vllm.sequence import ExecuteModelRequest from vllm.utils import (_run_task_with_lock, get_distributed_init_method, - get_ip, get_open_port, get_vllm_instance_id, - make_async) + get_ip, get_open_port, make_async) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -196,12 +195,8 @@ def sort_by_driver_then_worker_ip(worker): "environment variable, make sure it is unique for" " each node.") - VLLM_INSTANCE_ID = get_vllm_instance_id() - # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({ - "VLLM_INSTANCE_ID": - VLLM_INSTANCE_ID, "VLLM_TRACE_FUNCTION": str(envs.VLLM_TRACE_FUNCTION), }, ) for (node_id, _) in worker_node_and_gpu_ids] diff --git a/vllm/executor/ray_tpu_executor.py b/vllm/executor/ray_tpu_executor.py index c227b5e283c68..3ee59397bf4c9 100644 --- a/vllm/executor/ray_tpu_executor.py +++ b/vllm/executor/ray_tpu_executor.py @@ -13,7 +13,7 @@ from vllm.model_executor.layers.sampler import SamplerOutput from vllm.sequence import ExecuteModelRequest from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, - get_vllm_instance_id, make_async) + make_async) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -144,12 +144,8 @@ def sort_by_driver_then_worker_ip(worker): for i, (node_id, _) in enumerate(worker_node_and_gpu_ids): node_workers[node_id].append(i) - VLLM_INSTANCE_ID = get_vllm_instance_id() - # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({ - "VLLM_INSTANCE_ID": - VLLM_INSTANCE_ID, "VLLM_TRACE_FUNCTION": str(envs.VLLM_TRACE_FUNCTION), }, ) for _ in worker_node_and_gpu_ids] diff --git a/vllm/executor/ray_xpu_executor.py b/vllm/executor/ray_xpu_executor.py index 2b1cdc09b0a9f..61f5d6a65e999 100644 --- a/vllm/executor/ray_xpu_executor.py +++ b/vllm/executor/ray_xpu_executor.py @@ -5,7 +5,7 @@ from vllm.executor.ray_gpu_executor import RayGPUExecutor, RayGPUExecutorAsync from vllm.executor.xpu_executor import XPUExecutor from vllm.logger import init_logger -from vllm.utils import get_vllm_instance_id, make_async +from vllm.utils import make_async logger = init_logger(__name__) @@ -17,12 +17,8 @@ def _get_env_vars_to_be_updated(self): worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True) - VLLM_INSTANCE_ID = get_vllm_instance_id() - # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({ - "VLLM_INSTANCE_ID": - VLLM_INSTANCE_ID, "VLLM_TRACE_FUNCTION": str(envs.VLLM_TRACE_FUNCTION), }, ) for (_, _) in worker_node_and_gpu_ids] diff --git a/vllm/utils.py b/vllm/utils.py index 6cee4847e57b4..1f19d9eacd16d 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -24,9 +24,9 @@ from collections.abc import Iterable, Mapping from functools import lru_cache, partial, wraps from platform import uname -from typing import (Any, AsyncGenerator, Awaitable, Callable, Dict, Generic, - Hashable, List, Literal, Optional, OrderedDict, Set, Tuple, - Type, TypeVar, Union, overload) +from typing import (TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, + Dict, Generic, Hashable, List, Literal, Optional, + OrderedDict, Set, Tuple, Type, TypeVar, Union, overload) from uuid import uuid4 import numpy as np @@ -43,6 +43,9 @@ from vllm.logger import enable_trace_function_call, init_logger from vllm.platforms import current_platform +if TYPE_CHECKING: + from vllm.config import VllmConfig + logger = init_logger(__name__) # Exception strings for non-implemented encoder/decoder scenarios @@ -335,17 +338,6 @@ def random_uuid() -> str: return str(uuid.uuid4().hex) -@lru_cache(maxsize=None) -def get_vllm_instance_id() -> str: - """ - If the environment variable VLLM_INSTANCE_ID is set, return it. - Otherwise, return a random UUID. - Instance id represents an instance of the VLLM. All processes in the same - instance should have the same instance id. - """ - return envs.VLLM_INSTANCE_ID or f"vllm-instance-{random_uuid()}" - - @lru_cache(maxsize=None) def in_wsl() -> bool: # Reference: https://github.com/microsoft/WSL/issues/4071 @@ -997,7 +989,7 @@ def find_nccl_library() -> str: return so_file -def enable_trace_function_call_for_thread() -> None: +def enable_trace_function_call_for_thread(vllm_config: "VllmConfig") -> None: """Set up function tracing for the current thread, if enabled via the VLLM_TRACE_FUNCTION environment variable """ @@ -1009,7 +1001,8 @@ def enable_trace_function_call_for_thread() -> None: filename = (f"VLLM_TRACE_FUNCTION_for_process_{os.getpid()}" f"_thread_{threading.get_ident()}_" f"at_{datetime.datetime.now()}.log").replace(" ", "_") - log_path = os.path.join(tmp_dir, "vllm", get_vllm_instance_id(), + log_path = os.path.join(tmp_dir, "vllm", + f"vllm-instance-{vllm_config.instance_id}", filename) os.makedirs(os.path.dirname(log_path), exist_ok=True) enable_trace_function_call(log_path) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 7c0bc5a678956..6d00102e0a324 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -439,7 +439,7 @@ def init_worker(self, *args, **kwargs): Here we inject some common logic before initializing the worker. Arguments are passed to the worker class constructor. """ - enable_trace_function_call_for_thread() + enable_trace_function_call_for_thread(self.vllm_config) # see https://github.com/NVIDIA/nccl/issues/1234 os.environ['NCCL_CUMEM_ENABLE'] = '0'