Skip to content

Commit

Permalink
logger
Browse files Browse the repository at this point in the history
  • Loading branch information
MenxLi committed Feb 17, 2025
1 parent 8eeea0f commit e1f8a94
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 7 deletions.
156 changes: 156 additions & 0 deletions pody/eng/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from ..config import DATA_HOME
from typing import TypeVar, Callable, Literal, Optional
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
import logging, pathlib, asyncio
from logging import handlers

class BCOLORS:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
OKGRAY = '\033[90m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'

# Additional colors
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
LIGHTGRAY = '\033[37m'
DARKGRAY = '\033[90m'
LIGHTRED = '\033[91m'
LIGHTGREEN = '\033[92m'
LIGHTYELLOW = '\033[93m'
LIGHTBLUE = '\033[94m'
LIGHTMAGENTA = '\033[95m'
LIGHTCYAN = '\033[96m'

_thread_pool = ThreadPoolExecutor(max_workers=2)
def thread_wrap(func):
def wrapper(*args, **kwargs):
_thread_pool.submit(func, *args, **kwargs)
return wrapper

class BaseLogger(logging.Logger):
def finalize(self):
for handler in self.handlers:
handler.flush()
handler.close()
self.removeHandler(handler)

@thread_wrap
def debug(self, *args, **kwargs): super().debug(*args, **kwargs)
@thread_wrap
def info(self, *args, **kwargs): super().info(*args, **kwargs)
@thread_wrap
def warning(self, *args, **kwargs): super().warning(*args, **kwargs)
@thread_wrap
def error(self, *args, **kwargs): super().error(*args, **kwargs)

_fh_T = Literal['rotate', 'simple', 'daily']

__g_logger_dict: dict[str, BaseLogger] = {}
def get_logger(
name = 'default',
log_home = pathlib.Path(DATA_HOME) / 'logs',
level = 'DEBUG',
term_level = 'INFO',
file_handler_type: _fh_T = 'rotate',
global_instance = True
)->BaseLogger:
if global_instance and name in __g_logger_dict:
return __g_logger_dict[name]

def setupLogger(logger: BaseLogger):
logger.setLevel(level)

format_str = BCOLORS.LIGHTMAGENTA + ' %(asctime)s ' +BCOLORS.OKCYAN + '[%(name)s][%(levelname)s] ' + BCOLORS.ENDC + ' %(message)s'
formatter = logging.Formatter(format_str)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(term_level)
logger.addHandler(console_handler)

# format_str_plain = format_str.replace(BCOLORS.LIGHTMAGENTA, '').replace(BCOLORS.OKCYAN, '').replace(BCOLORS.ENDC, '')
format_str_plain = format_str
for color in BCOLORS.__dict__.values():
if isinstance(color, str) and color.startswith('\033'):
format_str_plain = format_str_plain.replace(color, '')

formatter_plain = logging.Formatter(format_str_plain)
log_home.mkdir(exist_ok=True)
log_file = log_home / f'{name}.log'
if file_handler_type == 'simple':
file_handler = logging.FileHandler(log_file)
elif file_handler_type == 'daily':
file_handler = handlers.TimedRotatingFileHandler(
log_file, when='midnight', interval=1, backupCount=30
)
elif file_handler_type == 'rotate':
file_handler = handlers.RotatingFileHandler(
log_file, maxBytes=5*1024*1024, backupCount=3 # 5MB
)

file_handler.setFormatter(formatter_plain)
logger.addHandler(file_handler)

logger = BaseLogger(name)
setupLogger(logger)
if global_instance:
__g_logger_dict[name] = logger

return logger

def clear_handlers(logger: logging.Logger):
for handler in logger.handlers:
handler.flush()
handler.close()
logger.removeHandler(handler)
__g_logger_dict.pop(logger.name, None)
# print(f'Cleared handlers for logger {logger.name}')

FUNCTION_T = TypeVar('FUNCTION_T', bound=Callable)
def log_access(
include_args: bool = True,
logger: Optional[BaseLogger] = None,
):
if logger is None:
logger = get_logger()

def _log_access(fn: FUNCTION_T) -> FUNCTION_T:
if asyncio.iscoroutinefunction(fn):
@wraps(fn)
async def async_wrapper(*args, **kwargs):
if include_args:
logger.info(f'[func] <{fn.__name__}> called with: {args}, {kwargs}')
else:
logger.info(f'[func] <{fn.__name__}>')

return await fn(*args, **kwargs)
return async_wrapper # type: ignore
else:
@wraps(fn)
def wrapper(*args, **kwargs):
logger = get_logger()
if include_args:
logger.info(f'[func] <{fn.__name__}> called with: {args}, {kwargs}')
else:
logger.info(f'[func] <{fn.__name__}>')

return fn(*args, **kwargs)
return wrapper # type: ignore
return _log_access

__ALL__ = [
'get_logger', 'log_access'
]
6 changes: 3 additions & 3 deletions pody/svc/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import docker

from ..eng.errors import *
from ..eng.log import get_logger
from ..config import config
from ..eng.user import UserDatabase, hash_password

Expand Down Expand Up @@ -68,9 +69,8 @@ def _require_permission(user = Depends(get_user)):

@app.middleware("http")
async def log_requests(request, call_next):
print(f"Request: {request.url}")
print(f"Headers: {request.headers}")
print(f"From: {request.client.host}")
logger = get_logger('requests')
logger.debug(f"Request: {request.url} | From: {request.client.host} | Headers: {request.headers}")
response = await call_next(request)
return response

Expand Down
9 changes: 5 additions & 4 deletions pody/svc/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ..eng.user import UserDatabase
from ..eng.gpu import GPUHandler
from ..eng.docker import exec_container_bash
from ..eng.log import get_logger
from .router_host import gpu_status_impl

def leave_info(container_name, info: str, level: str = "info"):
Expand All @@ -14,6 +15,7 @@ def leave_info(container_name, info: str, level: str = "info"):
exec_container_bash(container_name, f"mkdir -p {logdir} && echo '{info}' > {logdir}/{fname}")

def task_check_gpu_usage():
logger = get_logger('daemon')
client = docker.from_env()
user_db = UserDatabase()

Expand All @@ -32,8 +34,6 @@ def task_check_gpu_usage():
for user in this_gpu_users:
user_proc_count[user] = user_proc_count.get(user, 0) + 1

print("[Daemon] GPU usage: {}".format(user_proc_count))

for username, proc_count in user_proc_count.items():
user = user_db.get_user(username)
if user.userid == 0: # skip task not related to this database
Expand All @@ -49,15 +49,16 @@ def task_check_gpu_usage():
cmd = p['cmd']
leave_info(pod_name, f"Killed container with pid-{pid} ({cmd}) due to GPU quota exceeded.", "critical")
client.containers.get(pod_name).stop()
print(f"[Daemon] Killed container {pod_name} with pid-{pid} ({cmd}) due to GPU quota exceeded.")
logger.info(f"Killed container {pod_name} with pid-{pid} ({cmd}) due to GPU quota exceeded.")

def daemon_worker():
logger = get_logger('daemon')
while True:
try:
task_check_gpu_usage()
except Exception as e:
if isinstance(e, KeyboardInterrupt): raise
print(f"[Daemon] Error: {e}")
logger.exception("Daemon task failed: " + str(e))
time.sleep(60) # check every minute

def start_daemon() -> mp.Process:
Expand Down

0 comments on commit e1f8a94

Please sign in to comment.