From 1da99a81f56ebdf98bcf0f9bf4ad27a7a607ffef Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 17:45:56 +0000 Subject: [PATCH] updated --- vllm/v1/executor/multiproc_executor.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 9d9d03b0228ee..583a9c36cdfa0 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -332,7 +332,6 @@ def signal_handler(signum, frame): signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) - parent_process = psutil.Process().parent() worker: Optional[WorkerProc] = None try: worker = WorkerProc(*args, **kwargs) @@ -345,12 +344,17 @@ def signal_handler(signum, frame): worker.worker_busy_loop() except SystemExit: + # Avoid re-raising SystemExit more than once, such + # that we have a cleaner stack trace. + shutdown_requested = True logger.debug("Worker interrupted.") except Exception: - traceback = get_exception_traceback() - logger.error("Worker hit an exception: %s", traceback) - parent_process.send_signal(signal.SIGQUIT) + # While busy loop handles exceptions and alerts EngineCore, + # if there is an error in startup process (e.g. OOM) + # or there an with the IPC itself, alert parent + # so we can shut down the whole system. + psutil.Process().parent().send_signal(signal.SIGQUIT) raise finally: @@ -387,19 +391,18 @@ class ResponseStatus(Enum): def worker_busy_loop(self): """Main busy loop for Multiprocessing Workers""" - i = 0 + method, args, kwargs = self.rpc_broadcast_mq.dequeue() while True: - method, args, kwargs = self.rpc_broadcast_mq.dequeue() - try: - if i == 10: - raise ValueError("SIMULATE CUDA EXCEPTION") - i += 1 + if self.rank == 0: + raise ValueError("SIMULATE CUDA ERROR") output = getattr(self.worker, method)(*args, **kwargs) except Exception as e: self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.FAILURE, e)) - continue + traceback = get_exception_traceback() + logger.error("WorkerProc hit an exception: %s", traceback) + raise SystemExit() self.worker_response_mq.enqueue( (WorkerProc.ResponseStatus.SUCCESS, output))