diff --git a/compute_horde/compute_horde/executor_class.py b/compute_horde/compute_horde/executor_class.py index 67fbfbee4..2e7410587 100644 --- a/compute_horde/compute_horde/executor_class.py +++ b/compute_horde/compute_horde/executor_class.py @@ -61,8 +61,9 @@ class ExecutorClassSpec: } -# this leaves around 1 min for synthetic job to complete -MAX_EXECUTOR_TIMEOUT = timedelta(minutes=4).total_seconds() +# we split 144min 2 tempos window to 24 validators - this is total time after reservation, +# validator may wait spin_up time of executor class to synchronize running synthetic batch +MAX_EXECUTOR_TIMEOUT = timedelta(minutes=6).total_seconds() DEFAULT_EXECUTOR_CLASS = ExecutorClass.spin_up_4min__gpu_24gb DEFAULT_EXECUTOR_TIMEOUT = EXECUTOR_CLASS[DEFAULT_EXECUTOR_CLASS].spin_up_time diff --git a/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py b/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py index f6d34a72a..d0d8bafc9 100644 --- a/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py +++ b/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py @@ -5,6 +5,7 @@ import time from compute_horde.executor_class import ( + EXECUTOR_CLASS, MAX_EXECUTOR_TIMEOUT, ExecutorClass, ) @@ -29,6 +30,9 @@ def is_expired(self): class ExecutorClassPool: + RESERVATION_TIMEOUT = MAX_EXECUTOR_TIMEOUT + POOL_CLEANUP_PERIOD = 10 + def __init__(self, manager, executor_class: ExecutorClass, executor_count: int): self.manager = manager self.executor_class = executor_class @@ -42,7 +46,7 @@ async def reserve_executor(self, token, timeout): async with self._reservation_lock: while True: if self.get_availability() == 0: - if time.time() - start < MAX_EXECUTOR_TIMEOUT: + if time.time() - start < self.RESERVATION_TIMEOUT: await asyncio.sleep(1) else: logger.warning("Error unavailable after timeout") @@ -71,26 +75,36 @@ async def _pool_cleanup_loop(self): await self._pool_cleanup() except Exception as exc: logger.error("Error occurred", exc_info=exc) - await asyncio.sleep(10) + await asyncio.sleep(self.POOL_CLEANUP_PERIOD) async def _pool_cleanup(self): - executors_to_drop = set() - for reserved_executor in self._executors: + async def check_executor(reserved_executor): status = await self.manager.wait_for_executor(reserved_executor.executor, 1) if status is not None: - executors_to_drop.add(reserved_executor) - else: - if reserved_executor.is_expired(): - await self.manager.kill_executor(reserved_executor.executor) - executors_to_drop.add(reserved_executor) - still_running_executors = [] - for reserved_executor in self._executors: - if reserved_executor not in executors_to_drop: - still_running_executors.append(reserved_executor) - self._executors = still_running_executors + return reserved_executor, True + elif reserved_executor.is_expired(): + await self.manager.kill_executor(reserved_executor.executor) + return reserved_executor, True + return reserved_executor, False + + results = await asyncio.gather( + *[check_executor(reserved_executor) for reserved_executor in self._executors] + ) + + executors_to_drop = set( + reserved_executor for reserved_executor, should_drop in results if should_drop + ) + + self._executors = [ + reserved_executor + for reserved_executor in self._executors + if reserved_executor not in executors_to_drop + ] class BaseExecutorManager(metaclass=abc.ABCMeta): + EXECUTOR_TIMEOUT_LEEWAY = dt.timedelta(seconds=30).total_seconds() + def __init__(self): self._executor_class_pools = {} @@ -135,4 +149,11 @@ async def get_executor_class_pool(self, executor_class): async def reserve_executor_class(self, token, executor_class, timeout): pool = await self.get_executor_class_pool(executor_class) - await pool.reserve_executor(token, timeout) + await pool.reserve_executor(token, self.get_total_timeout(executor_class, timeout)) + + def get_total_timeout(self, executor_class, job_timeout): + spec = EXECUTOR_CLASS.get(executor_class) + spin_up_time = 0 + if spec is not None: + spin_up_time = spec.spin_up_time + return spin_up_time + job_timeout + self.EXECUTOR_TIMEOUT_LEEWAY diff --git a/miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py b/miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py new file mode 100644 index 000000000..46961c7a2 --- /dev/null +++ b/miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py @@ -0,0 +1,208 @@ +import asyncio +from contextlib import asynccontextmanager +from unittest.mock import patch + +import pytest +import pytest_asyncio +from compute_horde.executor_class import ExecutorClass + +from compute_horde_miner.miner.executor_manager._internal.base import ( + BaseExecutorManager, + ExecutorUnavailable, +) + + +class DummyExecutor: + def __init__(self, execution_time): + self.execution_time = execution_time + self.task = None + + async def run(self): + try: + await asyncio.sleep(self.execution_time) + return "Completed" + except asyncio.CancelledError: + return "Cancelled" + + +class DummyExecutorManager(BaseExecutorManager): + def __init__(self, manifest, runtime_offset=0): + super().__init__() + self.manifest = manifest + self.executors = [] + self.runtime_offset = runtime_offset + self.EXECUTOR_TIMEOUT_LEEWAY = 0 + + @asynccontextmanager + async def set_runtime_offset(self, offset): + old_offset = self.runtime_offset + self.runtime_offset = offset + try: + yield + finally: + self.runtime_offset = old_offset + + async def start_new_executor(self, token, executor_class, timeout): + executor = DummyExecutor(timeout + self.runtime_offset) + executor.task = asyncio.create_task(executor.run()) + self.executors.append(executor) + return executor + + async def kill_executor(self, executor): + if executor.task and not executor.task.done(): + executor.task.cancel() + if executor in self.executors: + self.executors.remove(executor) + + async def wait_for_executor(self, executor, timeout): + try: + return await asyncio.wait_for(asyncio.shield(executor.task), timeout) + except TimeoutError: + return None + + async def get_manifest(self): + return self.manifest + + +@pytest_asyncio.fixture +async def dummy_manager(): + manifest = { + ExecutorClass.always_on__gpu_24gb: 2, + } + manager = DummyExecutorManager(manifest, runtime_offset=-2) + yield manager + for pool in manager._executor_class_pools.values(): + pool._pool_cleanup_task.cancel() + try: + await pool._pool_cleanup_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_executor_class_pool(dummy_manager): + # Test reserving executors + pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + + executor1 = await pool.reserve_executor("token1", 2) + assert isinstance(executor1, DummyExecutor) + assert pool.get_availability() == 1 + + executor2 = await pool.reserve_executor("token2", 7) + assert isinstance(executor2, DummyExecutor) + assert pool.get_availability() == 0 + + # Test ExecutorUnavailable exception + with pytest.raises(ExecutorUnavailable): + await pool.reserve_executor("token3", 20) + + # Test executor completion + status = await dummy_manager.wait_for_executor(executor1, 1) + assert status == "Completed" + status = await dummy_manager.wait_for_executor(executor2, 1) + assert status is None + + # Allow time for pool cleanup + await asyncio.sleep(1) + assert pool.get_availability() == 1 + + # Test executor completion + status = await dummy_manager.wait_for_executor(executor2, 7) + assert status == "Completed" + + # Allow time for pool cleanup + await asyncio.sleep(1) + assert pool.get_availability() == 2 + + # Test long-running executor + async with dummy_manager.set_runtime_offset(5): + long_running_executor = await pool.reserve_executor("token4", 5) + + # Wait a bit, but not long enough for the executor to complete + status = await dummy_manager.wait_for_executor(long_running_executor, 2) + assert status is None + + # Wait for the executor to be killed by the cleanup process + status = await dummy_manager.wait_for_executor(long_running_executor, 10) + assert status == "Cancelled" + + # Allow time for pool cleanup + await asyncio.sleep(1) + assert pool.get_availability() == 2 + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_manager_reserve_executor_class(dummy_manager): + await dummy_manager.reserve_executor_class("token1", ExecutorClass.always_on__gpu_24gb, 10) + assert ( + await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + ).get_availability() == 1 + + await dummy_manager.reserve_executor_class("token2", ExecutorClass.always_on__gpu_24gb, 10) + assert ( + await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + ).get_availability() == 0 + + with pytest.raises(ExecutorUnavailable): + await dummy_manager.reserve_executor_class("token3", ExecutorClass.always_on__gpu_24gb, 10) + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_manifest_update(dummy_manager): + pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + assert pool._count == 2 + + # Update manifest + dummy_manager.manifest = {ExecutorClass.always_on__gpu_24gb: 3} + + # Get pool again to trigger update + pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + assert pool._count == 3 + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_concurrent_reservations(dummy_manager): + async def reserve(i): + try: + await dummy_manager.reserve_executor_class( + f"token{i}", ExecutorClass.always_on__gpu_24gb, 5 + ) + return True + except ExecutorUnavailable: + return False + + results = await asyncio.gather(*[reserve(i) for i in range(5)]) + assert results.count(True) == 2 + assert results.count(False) == 3