From e1f8a94b8d2e557e0607b44e568dc15defb8fb20 Mon Sep 17 00:00:00 2001 From: limengxun Date: Mon, 17 Feb 2025 18:10:11 +0800 Subject: [PATCH] logger --- pody/eng/log.py | 156 +++++++++++++++++++++++++++++++++++++++++++ pody/svc/app_base.py | 6 +- pody/svc/daemon.py | 9 +-- 3 files changed, 164 insertions(+), 7 deletions(-) create mode 100644 pody/eng/log.py diff --git a/pody/eng/log.py b/pody/eng/log.py new file mode 100644 index 0000000..678f9e4 --- /dev/null +++ b/pody/eng/log.py @@ -0,0 +1,156 @@ +from ..config import DATA_HOME +from typing import TypeVar, Callable, Literal, Optional +from concurrent.futures import ThreadPoolExecutor +from functools import wraps +import logging, pathlib, asyncio +from logging import handlers + +class BCOLORS: + HEADER = '\033[95m' + OKBLUE = '\033[94m' + OKCYAN = '\033[96m' + OKGREEN = '\033[92m' + OKGRAY = '\033[90m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + # Additional colors + BLACK = '\033[30m' + RED = '\033[31m' + GREEN = '\033[32m' + YELLOW = '\033[33m' + BLUE = '\033[34m' + MAGENTA = '\033[35m' + CYAN = '\033[36m' + WHITE = '\033[37m' + LIGHTGRAY = '\033[37m' + DARKGRAY = '\033[90m' + LIGHTRED = '\033[91m' + LIGHTGREEN = '\033[92m' + LIGHTYELLOW = '\033[93m' + LIGHTBLUE = '\033[94m' + LIGHTMAGENTA = '\033[95m' + LIGHTCYAN = '\033[96m' + +_thread_pool = ThreadPoolExecutor(max_workers=2) +def thread_wrap(func): + def wrapper(*args, **kwargs): + _thread_pool.submit(func, *args, **kwargs) + return wrapper + +class BaseLogger(logging.Logger): + def finalize(self): + for handler in self.handlers: + handler.flush() + handler.close() + self.removeHandler(handler) + + @thread_wrap + def debug(self, *args, **kwargs): super().debug(*args, **kwargs) + @thread_wrap + def info(self, *args, **kwargs): super().info(*args, **kwargs) + @thread_wrap + def warning(self, *args, **kwargs): super().warning(*args, **kwargs) + @thread_wrap + def error(self, *args, **kwargs): super().error(*args, **kwargs) + +_fh_T = Literal['rotate', 'simple', 'daily'] + +__g_logger_dict: dict[str, BaseLogger] = {} +def get_logger( + name = 'default', + log_home = pathlib.Path(DATA_HOME) / 'logs', + level = 'DEBUG', + term_level = 'INFO', + file_handler_type: _fh_T = 'rotate', + global_instance = True + )->BaseLogger: + if global_instance and name in __g_logger_dict: + return __g_logger_dict[name] + + def setupLogger(logger: BaseLogger): + logger.setLevel(level) + + format_str = BCOLORS.LIGHTMAGENTA + ' %(asctime)s ' +BCOLORS.OKCYAN + '[%(name)s][%(levelname)s] ' + BCOLORS.ENDC + ' %(message)s' + formatter = logging.Formatter(format_str) + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + console_handler.setLevel(term_level) + logger.addHandler(console_handler) + + # format_str_plain = format_str.replace(BCOLORS.LIGHTMAGENTA, '').replace(BCOLORS.OKCYAN, '').replace(BCOLORS.ENDC, '') + format_str_plain = format_str + for color in BCOLORS.__dict__.values(): + if isinstance(color, str) and color.startswith('\033'): + format_str_plain = format_str_plain.replace(color, '') + + formatter_plain = logging.Formatter(format_str_plain) + log_home.mkdir(exist_ok=True) + log_file = log_home / f'{name}.log' + if file_handler_type == 'simple': + file_handler = logging.FileHandler(log_file) + elif file_handler_type == 'daily': + file_handler = handlers.TimedRotatingFileHandler( + log_file, when='midnight', interval=1, backupCount=30 + ) + elif file_handler_type == 'rotate': + file_handler = handlers.RotatingFileHandler( + log_file, maxBytes=5*1024*1024, backupCount=3 # 5MB + ) + + file_handler.setFormatter(formatter_plain) + logger.addHandler(file_handler) + + logger = BaseLogger(name) + setupLogger(logger) + if global_instance: + __g_logger_dict[name] = logger + + return logger + +def clear_handlers(logger: logging.Logger): + for handler in logger.handlers: + handler.flush() + handler.close() + logger.removeHandler(handler) + __g_logger_dict.pop(logger.name, None) + # print(f'Cleared handlers for logger {logger.name}') + +FUNCTION_T = TypeVar('FUNCTION_T', bound=Callable) +def log_access( + include_args: bool = True, + logger: Optional[BaseLogger] = None, +): + if logger is None: + logger = get_logger() + + def _log_access(fn: FUNCTION_T) -> FUNCTION_T: + if asyncio.iscoroutinefunction(fn): + @wraps(fn) + async def async_wrapper(*args, **kwargs): + if include_args: + logger.info(f'[func] <{fn.__name__}> called with: {args}, {kwargs}') + else: + logger.info(f'[func] <{fn.__name__}>') + + return await fn(*args, **kwargs) + return async_wrapper # type: ignore + else: + @wraps(fn) + def wrapper(*args, **kwargs): + logger = get_logger() + if include_args: + logger.info(f'[func] <{fn.__name__}> called with: {args}, {kwargs}') + else: + logger.info(f'[func] <{fn.__name__}>') + + return fn(*args, **kwargs) + return wrapper # type: ignore + return _log_access + +__ALL__ = [ + 'get_logger', 'log_access' +] \ No newline at end of file diff --git a/pody/svc/app_base.py b/pody/svc/app_base.py index 7c7e4b2..22cc236 100644 --- a/pody/svc/app_base.py +++ b/pody/svc/app_base.py @@ -10,6 +10,7 @@ import docker from ..eng.errors import * +from ..eng.log import get_logger from ..config import config from ..eng.user import UserDatabase, hash_password @@ -68,9 +69,8 @@ def _require_permission(user = Depends(get_user)): @app.middleware("http") async def log_requests(request, call_next): - print(f"Request: {request.url}") - print(f"Headers: {request.headers}") - print(f"From: {request.client.host}") + logger = get_logger('requests') + logger.debug(f"Request: {request.url} | From: {request.client.host} | Headers: {request.headers}") response = await call_next(request) return response diff --git a/pody/svc/daemon.py b/pody/svc/daemon.py index 6766d7e..68d8922 100644 --- a/pody/svc/daemon.py +++ b/pody/svc/daemon.py @@ -4,6 +4,7 @@ from ..eng.user import UserDatabase from ..eng.gpu import GPUHandler from ..eng.docker import exec_container_bash +from ..eng.log import get_logger from .router_host import gpu_status_impl def leave_info(container_name, info: str, level: str = "info"): @@ -14,6 +15,7 @@ def leave_info(container_name, info: str, level: str = "info"): exec_container_bash(container_name, f"mkdir -p {logdir} && echo '{info}' > {logdir}/{fname}") def task_check_gpu_usage(): + logger = get_logger('daemon') client = docker.from_env() user_db = UserDatabase() @@ -32,8 +34,6 @@ def task_check_gpu_usage(): for user in this_gpu_users: user_proc_count[user] = user_proc_count.get(user, 0) + 1 - print("[Daemon] GPU usage: {}".format(user_proc_count)) - for username, proc_count in user_proc_count.items(): user = user_db.get_user(username) if user.userid == 0: # skip task not related to this database @@ -49,15 +49,16 @@ def task_check_gpu_usage(): cmd = p['cmd'] leave_info(pod_name, f"Killed container with pid-{pid} ({cmd}) due to GPU quota exceeded.", "critical") client.containers.get(pod_name).stop() - print(f"[Daemon] Killed container {pod_name} with pid-{pid} ({cmd}) due to GPU quota exceeded.") + logger.info(f"Killed container {pod_name} with pid-{pid} ({cmd}) due to GPU quota exceeded.") def daemon_worker(): + logger = get_logger('daemon') while True: try: task_check_gpu_usage() except Exception as e: if isinstance(e, KeyboardInterrupt): raise - print(f"[Daemon] Error: {e}") + logger.exception("Daemon task failed: " + str(e)) time.sleep(60) # check every minute def start_daemon() -> mp.Process: