From 6e0e0d43838ef6bab5d7b3daaa3c6745ee9420ad Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 18:38:05 +0000 Subject: [PATCH] stash --- vllm/v1/executor/multiproc_executor.py | 20 +++++++++----------- vllm/v1/worker/gpu_worker.py | 6 ++++++ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 82f5acbb953d5..caff86b012211 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -3,6 +3,7 @@ import signal import sys import time +import weakref from dataclasses import dataclass from enum import Enum, auto from multiprocessing.process import BaseProcess @@ -19,9 +20,8 @@ from vllm.executor.multiproc_worker_utils import ( _add_prefix, set_multiprocessing_worker_envs) from vllm.logger import init_logger -from vllm.utils import (get_distributed_init_method, get_exception_traceback, - get_mp_context, get_open_port, get_open_zmq_ipc_path, - kill_process_tree, zmq_socket_ctx) +from vllm.utils import (get_distributed_init_method, get_mp_context, + get_open_port, get_open_zmq_ipc_path, zmq_socket_ctx) from vllm.v1.executor.abstract import Executor from vllm.v1.outputs import ModelRunnerOutput from vllm.worker.worker_base import WorkerWrapperBase @@ -35,6 +35,9 @@ class MultiprocExecutor(Executor): def __init__(self, vllm_config: VllmConfig) -> None: + # Call self.shutdown at exit to clean up + # and ensure workers will be terminated. + self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. @@ -344,15 +347,12 @@ def signal_handler(signum, frame): worker.worker_busy_loop() except SystemExit: - # worker_busy_loop sends exceptions to Executor and raises - # SystemExit. - shutdown_requested = True logger.debug("Worker interrupted.") except Exception: # worker_busy_loop sends exceptions exceptons to Executor # for shutdown, but if there is an error in startup or an - # error with IPC + # error with IPC itself, we need to alert the parent. # itself, we need to alert the parent so we can shut down. psutil.Process().parent().send_signal(signal.SIGQUIT) raise @@ -390,18 +390,16 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" + while True: method, args, kwargs = self.rpc_broadcast_mq.dequeue() try: - if self.rank == 0: - raise ValueError("SIMULATE CUDA ERROR") output = getattr(self.worker, method)(*args, **kwargs) except Exception as e: self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.FAILURE, e)) - traceback = get_exception_traceback() - logger.error("WorkerProc hit an exception: %s", traceback) + logger.exception("WorkerProc hit an exception: %s", exc_info=e) continue self.worker_response_mq.enqueue( diff --git a/vllm/v1/worker/gpu_worker.py b/vllm/v1/worker/gpu_worker.py index af438f7d5820c..a7723e4e85264 100644 --- a/vllm/v1/worker/gpu_worker.py +++ b/vllm/v1/worker/gpu_worker.py @@ -35,6 +35,8 @@ def __init__( distributed_init_method: str, ): + self.i = 0 + # TODO: use WorkerBase.__init__(self, vllm_config=vllm_config) self.vllm_config = vllm_config self.model_config = vllm_config.model_config @@ -201,6 +203,10 @@ def execute_model( self, scheduler_output: "SchedulerOutput", ) -> ModelRunnerOutput: + if self.rank == 0 and self.i == 10: + raise ValueError("ERROR FROM HERE :)") + self.i += 1 + output = self.model_runner.execute_model(scheduler_output) return output if self.rank == 0 else None