diff --git a/mbapy_lite/base.py b/mbapy_lite/base.py index 7afeeb5..00a21d8 100644 --- a/mbapy_lite/base.py +++ b/mbapy_lite/base.py @@ -2,7 +2,7 @@ Author: BHM-Bob 2262029386@qq.com Date: 2022-10-19 22:46:30 LastEditors: BHM-Bob 2262029386@qq.com -LastEditTime: 2024-11-30 17:17:48 +LastEditTime: 2025-02-06 20:18:07 Description: ''' import inspect @@ -191,14 +191,16 @@ def get_call_stack(): break return stack_info[1:][::-1] -def put_err(info:str, ret = None, warning_level = 0, _exit: Union[bool, int] = False): +def put_err(info:str, ret = None, warning_level = 0, _exit: Union[bool, int] = False, full_stack = True): """ Prints an error message along with the caller's name and arguments, if the warning level is greater than or equal to the error warning level specified in the Configs class. Parameters: - info (str): The error message to be printed. - ret (Any, optional): The return value of the function. Defaults to None. - warning_level (int, optional): The warning level of the error. Defaults to 0. + - info (str): The error message to be printed. + - ret (Any, optional): The return value of the function. Defaults to None. + - warning_level (int, optional): The warning level of the error. Defaults to 0. + - _exit (Union[bool, int], optional): Whether to exit the program after printing the error message. If an integer is provided, the program will exit with that status code. Defaults to False. + - full_stack (bool, optional): Whether to log the full call stack. Defaults to True. Returns: Any: The value specified by the ret parameter. @@ -212,31 +214,36 @@ def put_err(info:str, ret = None, warning_level = 0, _exit: Union[bool, int] = F - It appends the log to the list of logs in the Configs class and prints it. """ if warning_level >= Configs.err_warning_level: + time_str = get_fmt_time("%Y-%m-%d %H:%M:%S.%f") frame = inspect.currentframe().f_back - caller_name = frame.f_code.co_name caller_args = inspect.getargvalues(frame).args - err_str = f'\nERROR INFO : {caller_name:s} {caller_args}:\n {info:s}\n' + stacks = get_call_stack()[:-1] if full_stack else [get_call_stack()[-2]] + err_str = f'\nERROR[{time_str:s}]: {">".join(stacks):s}({", ".join(caller_args)}):\n {info:s}\n' print(err_str) Configs.logs.append(err_str) if not __NO_ERR__ and _exit: exit(_exit) return ret -def put_log(info:str, head = "bapy::log", ret = None): +def put_log(info:str, head = "log", ret = None, full_stack = False, new_line: bool = False): """ Logs the given information with a formatted timestamp, call stack, and provided head. Appends the log to the list of logs in the Configs class and prints it. Parameters: - info (str): The information to be logged. - - head (str): The head of the log message. Default is "bapy::log". - - ret : The value to return. Default is None. + - head (str): The head of the log message. Default is "log". + - ret (Any) : The value to return. Default is None. + - full_stack (bool): Whether to log the full call stack. Default is False. + - new_line (bool): Whether to add a new line after the log message. Default is False. Returns: Any: The value specified by the ret parameter. """ - time_str = get_fmt_time() - log_str = f'\n{head:s} {time_str:s}: {">".join(get_call_stack()[:-1]):s}: {info:s}\n' + time_str = get_fmt_time("%Y-%m-%d %H:%M:%S.%f") + stacks = get_call_stack()[:-1] if full_stack else [get_call_stack()[-2]] + new_line = '\n' if new_line else ' ' + log_str = f'\n{head:s}[{time_str:s}]: {">".join(stacks):s}:{new_line}{info:s}\n' Configs.logs.append(log_str) print(log_str) return ret diff --git a/mbapy_lite/web_utils/task.py b/mbapy_lite/web_utils/task.py index c566226..4574869 100644 --- a/mbapy_lite/web_utils/task.py +++ b/mbapy_lite/web_utils/task.py @@ -269,6 +269,7 @@ def __init__(self, mode: str = 'async', n_worker: int = None, self.MODE = mode self.N_WORKER = n_worker self.IS_STARTED = False + self.sleep_while_empty = sleep_while_empty self._async_loop: asyncio.AbstractEventLoop = None self._thread_task_queue: Queue = Queue() self._thread_result_queue: Queue = Queue() @@ -317,6 +318,8 @@ def _run_process_loop(self): except Exception as e: self._thread_result_queue.put((task_name, e, TaskStatus.NOT_SUCCEEDED)) del tasks_cache[task_name] + time.sleep(self.sleep_while_empty) + time.sleep(self.sleep_while_empty) pool.close() def _run_isolated_process_loop(self): @@ -505,6 +508,32 @@ def wait_till_tasks_done(self, task_names: List[str], self.wait_till(lambda names: names.issubset(set([r[0] for r in self.tasks.values() if r != TaskStatus.NOT_RETURNED])), wait_each_loop = wait_each_loop, verbose=False, names=set(task_names)) return {name: self.query_task(name) for name in task_names} + + def clear(self, clear_tasks: bool = True, clear_queue: bool = True): + """ + Clear the task pool, including tasks and queues. + + Parameters: + - clear_tasks (bool, default=True): Whether to clear the task dictionary. + - clear_queue (bool, default=True): Whether to clear the task queue and result queue. + + Returns: + list: A list containing the sizes of the task dictionary, task queue, and result queue before clearing. + + Notes: + - This method does not check wether the task pool is running or not. + """ + # Record the sizes of the task dictionary, task queue, and result queue before clearing + sizes = [len(self.tasks), self._thread_task_queue.qsize(), self._thread_result_queue.qsize()] + # Clear the task dictionary + if clear_tasks: + self.tasks.clear() + # Clear the task queue and result queue + while not self._thread_task_queue.empty() and clear_queue: + self._thread_task_queue.get() + while not self._thread_result_queue.empty() and clear_queue: + self._thread_result_queue.get() + return sizes def close(self): """close the thread and event loop, join the thread"""