Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
  • Loading branch information
robertgshaw2-redhat committed Jan 3, 2025
1 parent c29f329 commit 1c4b92a
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
@@ -3,12 +3,12 @@
import signal
import sys
import time
import weakref
from dataclasses import dataclass
from enum import Enum, auto
from multiprocessing.process import BaseProcess
from typing import Any, Dict, List, Optional, Tuple

import psutil
import zmq

from vllm.config import VllmConfig
@@ -19,8 +19,9 @@
from vllm.executor.multiproc_worker_utils import (
_add_prefix, set_multiprocessing_worker_envs)
from vllm.logger import init_logger
from vllm.utils import (get_distributed_init_method, get_mp_context,
get_open_port, get_open_zmq_ipc_path, zmq_socket_ctx)
from vllm.utils import (get_distributed_init_method, get_exception_traceback,
get_mp_context, get_open_port, get_open_zmq_ipc_path,
kill_process_tree, zmq_socket_ctx)
from vllm.v1.executor.abstract import Executor
from vllm.v1.outputs import ModelRunnerOutput
from vllm.worker.worker_base import WorkerWrapperBase
@@ -34,10 +35,25 @@
class MultiprocExecutor(Executor):

def __init__(self, vllm_config: VllmConfig) -> None:
# Call self.shutdown at exit to clean up
# and ensure workers will be terminated.
self._finalizer = weakref.finalize(self, self.shutdown)

# The child processes will send SIGQUIT when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
# TODO: rather than killing the main process, we should
# figure out how to raise an AsyncEngineDeadError and
# handle at the API server level so we can return a better
# error code to the clients calling VLLM.

def sigquit_handler(signum, frame):
logger.fatal(
"MulitprocExecutor got SIGQUIT from worker processes, shutting "
"down. See stack trace above for root cause issue.")
# Propagate error up to parent process.
parent_process = psutil.Process().parent()
parent_process.send_signal(signal.SIGQUIT)
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)
self.vllm_config = vllm_config
self.parallel_config = vllm_config.parallel_config

@@ -321,7 +337,8 @@ def signal_handler(signum, frame):
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

worker = None
parent_process = psutil.Process().parent()
worker: Optional[WorkerProc] = None
try:
worker = WorkerProc(*args, **kwargs)

@@ -335,9 +352,10 @@ def signal_handler(signum, frame):
except SystemExit:
logger.debug("Worker interrupted.")

except BaseException as e:
logger.exception(e)
raise
except Exception:
traceback = get_exception_traceback()
logger.error("Worker hit an exception: %s", traceback)
parent_process.send_signal(signal.SIGQUIT)

finally:
# Clean up once worker exits busy loop

0 comments on commit 1c4b92a

Please sign in to comment.