Skip to content

Commit

Permalink
issue #140: expose internal constants to top-level module
Browse files Browse the repository at this point in the history
Allow to tune Pebble internal timers.

Signed-off-by: Matteo Cafasso <noxdafox@gmail.com>
  • Loading branch information
noxdafox committed Sep 28, 2024
1 parent 4a9c219 commit 6a12ddf
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pebble/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@


from pebble.decorators import synchronized, sighandler
from pebble.common import ProcessExpired, ProcessFuture
from pebble.functions import waitforqueues, waitforthreads
from pebble.common import ProcessExpired, ProcessFuture, CONSTS
from pebble.pool import ThreadPool, ProcessPool, MapFuture, ProcessMapFuture
4 changes: 2 additions & 2 deletions pebble/asynchronous/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def _get_result(
timeout: float
) -> Any:
"""Waits for result and handles communication errors."""
counter = count(step=common.SLEEP_UNIT)
counter = count(step=common.CONSTS.sleep_unit)

try:
while not pipe.poll():
Expand All @@ -128,7 +128,7 @@ async def _get_result(
error = asyncio.CancelledError()
return common.Result(common.ResultStatus.FAILURE, error)

await asyncio.sleep(common.SLEEP_UNIT)
await asyncio.sleep(common.CONSTS.sleep_unit)

return pipe.recv()
except (EOFError, OSError):
Expand Down
2 changes: 1 addition & 1 deletion pebble/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pebble.common.shared import decorate_function, get_asyncio_loop
from pebble.common.types import ProcessExpired, ProcessFuture, PebbleFuture
from pebble.common.types import Result, ResultStatus, RemoteException
from pebble.common.types import FutureStatus, SLEEP_UNIT
from pebble.common.types import FutureStatus, CONSTS
from pebble.common.process import launch_process, stop_process
from pebble.common.process import register_function, maybe_install_trampoline
from pebble.common.process import process_execute, send_result, function_handler
4 changes: 2 additions & 2 deletions pebble/common/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from traceback import format_exc
from typing import Any, Callable

from pebble.common.types import Result, ResultStatus, RemoteException
from pebble.common.types import Result, ResultStatus, RemoteException, CONSTS


def launch_process(
Expand All @@ -46,7 +46,7 @@ def launch_process(
def stop_process(process: multiprocessing.Process):
"""Does its best to stop the process."""
process.terminate()
process.join(3)
process.join(CONSTS.term_timeout)

if process.is_alive() and os.name != 'nt':
try:
Expand Down
17 changes: 16 additions & 1 deletion pebble/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,19 @@ class FutureStatus(str, Enum):
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'


SLEEP_UNIT = 0.1
@dataclass
class Consts:
"""Internal constants.
WARNING: changing these values will affect the behaviour
of Pools and decorators.
"""
sleep_unit: float = 0.1
"""Any cycle which needs to periodically assess the state."""
term_timeout: float = 3
"""On UNIX once a SIGTERM signal is issued to a process,
the amount of seconds to wait before issuing a SIGKILL signal."""


CONSTS = Consts()
4 changes: 2 additions & 2 deletions pebble/concurrent/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ def _get_result(
timeout: float
) -> Any:
"""Waits for result and handles communication errors."""
counter = count(step=common.SLEEP_UNIT)
counter = count(step=common.CONSTS.sleep_unit)

try:
while not pipe.poll(common.SLEEP_UNIT):
while not pipe.poll(common.CONSTS.sleep_unit):
if timeout is not None and next(counter) >= timeout:
error = TimeoutError('Task Timeout', timeout)
return common.Result(common.ResultStatus.FAILURE, error)
Expand Down
4 changes: 2 additions & 2 deletions pebble/pool/base_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from concurrent.futures import Future, TimeoutError

from pebble.common import Result, ResultStatus
from pebble.common import PebbleFuture, ProcessFuture, SLEEP_UNIT
from pebble.common import PebbleFuture, ProcessFuture, CONSTS


class BasePool:
Expand Down Expand Up @@ -86,7 +86,7 @@ def _wait_queue_depletion(self, timeout: Optional[float]):
if timeout is not None and time.time() - tick > timeout:
raise TimeoutError("Tasks are still being executed")
elif self._context.task_queue.unfinished_tasks:
time.sleep(SLEEP_UNIT)
time.sleep(CONSTS.sleep_unit)
else:
return

Expand Down
8 changes: 4 additions & 4 deletions pebble/pool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
from pebble.pool.base_pool import PoolContext, BasePool, Task, TaskPayload
from pebble.pool.base_pool import PoolStatus, ProcessMapFuture, map_results
from pebble.pool.channel import ChannelError, WorkerChannel, channels
from pebble.common import Result, ResultStatus, CONSTS
from pebble.common import launch_process, stop_process
from pebble.common import ProcessExpired, ProcessFuture
from pebble.common import process_execute, launch_thread
from pebble.common import Result, ResultStatus, SLEEP_UNIT


class ProcessPool(BasePool):
Expand Down Expand Up @@ -112,7 +112,7 @@ def schedule(self, function: Callable,

def submit(self, function: Callable,
timeout: Optional[float],
*args, **kwargs) -> ProcessFuture:
/, *args, **kwargs) -> ProcessFuture:
"""This function is provided for compatibility with
`asyncio.loop.run_in_executor`.
Expand Down Expand Up @@ -178,7 +178,7 @@ def pool_manager_loop(pool_manager: 'PoolManager'):
try:
while context.alive and not GLOBAL_SHUTDOWN:
pool_manager.update_status()
time.sleep(SLEEP_UNIT)
time.sleep(CONSTS.sleep_unit)
except BrokenProcessPool:
context.status = PoolStatus.ERROR

Expand All @@ -188,7 +188,7 @@ def message_manager_loop(pool_manager: 'PoolManager'):

try:
while context.alive and not GLOBAL_SHUTDOWN:
pool_manager.process_next_message(SLEEP_UNIT)
pool_manager.process_next_message(CONSTS.sleep_unit)
except BrokenProcessPool:
context.status = PoolStatus.ERROR

Expand Down
6 changes: 3 additions & 3 deletions pebble/pool/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from typing import Callable
from concurrent.futures import Future

from pebble.common import ResultStatus, execute, launch_thread, SLEEP_UNIT
from pebble.pool.base_pool import PoolStatus, MapFuture, map_results
from pebble.common import ResultStatus, execute, launch_thread, CONSTS
from pebble.pool.base_pool import iter_chunks, run_initializer
from pebble.pool.base_pool import PoolStatus, MapFuture, map_results
from pebble.pool.base_pool import PoolContext, BasePool, Task, TaskPayload


Expand Down Expand Up @@ -117,7 +117,7 @@ def pool_manager_loop(pool_manager: 'PoolManager'):

while context.alive:
pool_manager.update_status()
time.sleep(SLEEP_UNIT)
time.sleep(CONSTS.sleep_unit)


class PoolManager:
Expand Down

0 comments on commit 6a12ddf

Please sign in to comment.