diff --git a/.gitignore b/.gitignore index e893aa2be..b6a7e4a5c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ *.sqlite3 *~ *.egg-info/ -/.idea/ +.idea/ .env venv .hypothesis diff --git a/compute_horde/compute_horde/executor_class.py b/compute_horde/compute_horde/executor_class.py index 1e61f07fa..ae2d83d20 100644 --- a/compute_horde/compute_horde/executor_class.py +++ b/compute_horde/compute_horde/executor_class.py @@ -68,8 +68,9 @@ class ExecutorClassSpec: } -# this leaves around 1 min for synthetic job to complete -MAX_EXECUTOR_TIMEOUT = timedelta(minutes=4).total_seconds() +# we split 144min 2 tempos window to 24 validators - this is total time after reservation, +# validator may wait spin_up time of executor class to synchronize running synthetic batch +MAX_EXECUTOR_TIMEOUT = timedelta(minutes=6).total_seconds() DEFAULT_EXECUTOR_CLASS = ExecutorClass.spin_up_4min__gpu_24gb DEFAULT_EXECUTOR_TIMEOUT = EXECUTOR_CLASS[DEFAULT_EXECUTOR_CLASS].spin_up_time diff --git a/compute_horde/compute_horde/test_base/__init__.py b/compute_horde/compute_horde/test_base/__init__.py index a8b9c069c..8168e809d 100644 --- a/compute_horde/compute_horde/test_base/__init__.py +++ b/compute_horde/compute_horde/test_base/__init__.py @@ -105,7 +105,7 @@ def start_process(cls, args, additional_env: dict[str, str]): @classmethod def wait_for_process_start(cls, process_name, probe_function, process: subprocess.Popen): - for i in range(300): + for _ in range(300): if probe_function(): return if process.poll() is not None: diff --git a/compute_horde/pyproject.toml b/compute_horde/pyproject.toml index 3de5a52fc..a625eb0ba 100644 --- a/compute_horde/pyproject.toml +++ b/compute_horde/pyproject.toml @@ -55,17 +55,18 @@ line-length = 100 [tool.ruff.lint] # TODO add D -select = ["E", "F", "I", "UP"] +select = ["E", "F", "I", "UP", "B"] # TODO: remove E501 once docstrings are formatted ignore = [ "D100", "D105", "D107", "D200", "D202", "D203", "D205", "D212", "D400", "D401", "D415", "D101", "D102","D103", "D104", # TODO remove once we have docstring for all public methods "E501", # TODO: remove E501 once docstrings are formatted + "B027", "B904", "B905", ] [tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401"] -"test/**" = ["D", "F403", "F405"] +"**/tests/**" = ["D", "F403", "F405", "B018"] [tool.codespell] skip = 'pdm.lock' diff --git a/executor/bin/rotate-local-backups.py b/executor/bin/rotate-local-backups.py index 44eff24c6..57f47801f 100755 --- a/executor/bin/rotate-local-backups.py +++ b/executor/bin/rotate-local-backups.py @@ -18,7 +18,7 @@ def rotate_backups(path, file_count): files = files[:-file_count] if files: print(f"Removing {len(files)} old files") - for mtime, f in files: + for _mtime, f in files: f.unlink() else: print("No old files to remove") diff --git a/executor/pyproject.toml b/executor/pyproject.toml index da19a10a4..84c34d330 100644 --- a/executor/pyproject.toml +++ b/executor/pyproject.toml @@ -69,17 +69,18 @@ line-length = 100 [tool.ruff.lint] # TODO add D -select = ["E", "F", "I", "UP"] +select = ["E", "F", "I", "UP", "B"] # TODO: remove E501 once docstrings are formatted ignore = [ "D100", "D105", "D107", "D200", "D202", "D203", "D205", "D212", "D400", "D401", "D415", "D101", "D102","D103", "D104", # TODO remove once we have docstring for all public methods "E501", # TODO: remove E501 once docstrings are formatted + "B027", "B904", "B905", ] [tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401"] -"test/**" = ["D", "F403", "F405"] +"**/tests/**" = ["D", "F403", "F405", "B018"] [tool.codespell] skip = '*.min.js,*.lock,*/monitoring_certs/*' diff --git a/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py b/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py index 35c0a652f..d0d8bafc9 100644 --- a/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py +++ b/miner/app/src/compute_horde_miner/miner/executor_manager/_internal/base.py @@ -5,6 +5,7 @@ import time from compute_horde.executor_class import ( + EXECUTOR_CLASS, MAX_EXECUTOR_TIMEOUT, ExecutorClass, ) @@ -29,6 +30,9 @@ def is_expired(self): class ExecutorClassPool: + RESERVATION_TIMEOUT = MAX_EXECUTOR_TIMEOUT + POOL_CLEANUP_PERIOD = 10 + def __init__(self, manager, executor_class: ExecutorClass, executor_count: int): self.manager = manager self.executor_class = executor_class @@ -42,7 +46,7 @@ async def reserve_executor(self, token, timeout): async with self._reservation_lock: while True: if self.get_availability() == 0: - if time.time() - start < MAX_EXECUTOR_TIMEOUT: + if time.time() - start < self.RESERVATION_TIMEOUT: await asyncio.sleep(1) else: logger.warning("Error unavailable after timeout") @@ -71,26 +75,36 @@ async def _pool_cleanup_loop(self): await self._pool_cleanup() except Exception as exc: logger.error("Error occurred", exc_info=exc) - await asyncio.sleep(10) + await asyncio.sleep(self.POOL_CLEANUP_PERIOD) async def _pool_cleanup(self): - executors_to_drop = set() - for reserved_executor in self._executors: + async def check_executor(reserved_executor): status = await self.manager.wait_for_executor(reserved_executor.executor, 1) if status is not None: - executors_to_drop.add(reserved_executor) - else: - if reserved_executor.is_expired(): - await self.manager.kill_executor(reserved_executor.executor) - executors_to_drop.add(reserved_executor) - still_running_executors = [] - for reserved_executor in self._executors: - if reserved_executor not in executors_to_drop: - still_running_executors.append(reserved_executor) - self._executors = still_running_executors + return reserved_executor, True + elif reserved_executor.is_expired(): + await self.manager.kill_executor(reserved_executor.executor) + return reserved_executor, True + return reserved_executor, False + + results = await asyncio.gather( + *[check_executor(reserved_executor) for reserved_executor in self._executors] + ) + + executors_to_drop = set( + reserved_executor for reserved_executor, should_drop in results if should_drop + ) + + self._executors = [ + reserved_executor + for reserved_executor in self._executors + if reserved_executor not in executors_to_drop + ] class BaseExecutorManager(metaclass=abc.ABCMeta): + EXECUTOR_TIMEOUT_LEEWAY = dt.timedelta(seconds=30).total_seconds() + def __init__(self): self._executor_class_pools = {} @@ -119,7 +133,7 @@ async def get_manifest(self) -> dict[ExecutorClass, int]: Keys are executor class ids and values are number of supported executors for given executor class. """ - async def get_executor_class_pool(self, executor_class): + async def _sync_pools_with_manifest(self): manifest = await self.get_manifest() for executor_class, executor_count in manifest.items(): pool = self._executor_class_pools.get(executor_class) @@ -128,8 +142,18 @@ async def get_executor_class_pool(self, executor_class): self._executor_class_pools[executor_class] = pool else: pool.set_count(executor_count) + + async def get_executor_class_pool(self, executor_class): + await self._sync_pools_with_manifest() return self._executor_class_pools[executor_class] async def reserve_executor_class(self, token, executor_class, timeout): pool = await self.get_executor_class_pool(executor_class) - await pool.reserve_executor(token, timeout) + await pool.reserve_executor(token, self.get_total_timeout(executor_class, timeout)) + + def get_total_timeout(self, executor_class, job_timeout): + spec = EXECUTOR_CLASS.get(executor_class) + spin_up_time = 0 + if spec is not None: + spin_up_time = spec.spin_up_time + return spin_up_time + job_timeout + self.EXECUTOR_TIMEOUT_LEEWAY diff --git a/miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py b/miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py new file mode 100644 index 000000000..46961c7a2 --- /dev/null +++ b/miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py @@ -0,0 +1,208 @@ +import asyncio +from contextlib import asynccontextmanager +from unittest.mock import patch + +import pytest +import pytest_asyncio +from compute_horde.executor_class import ExecutorClass + +from compute_horde_miner.miner.executor_manager._internal.base import ( + BaseExecutorManager, + ExecutorUnavailable, +) + + +class DummyExecutor: + def __init__(self, execution_time): + self.execution_time = execution_time + self.task = None + + async def run(self): + try: + await asyncio.sleep(self.execution_time) + return "Completed" + except asyncio.CancelledError: + return "Cancelled" + + +class DummyExecutorManager(BaseExecutorManager): + def __init__(self, manifest, runtime_offset=0): + super().__init__() + self.manifest = manifest + self.executors = [] + self.runtime_offset = runtime_offset + self.EXECUTOR_TIMEOUT_LEEWAY = 0 + + @asynccontextmanager + async def set_runtime_offset(self, offset): + old_offset = self.runtime_offset + self.runtime_offset = offset + try: + yield + finally: + self.runtime_offset = old_offset + + async def start_new_executor(self, token, executor_class, timeout): + executor = DummyExecutor(timeout + self.runtime_offset) + executor.task = asyncio.create_task(executor.run()) + self.executors.append(executor) + return executor + + async def kill_executor(self, executor): + if executor.task and not executor.task.done(): + executor.task.cancel() + if executor in self.executors: + self.executors.remove(executor) + + async def wait_for_executor(self, executor, timeout): + try: + return await asyncio.wait_for(asyncio.shield(executor.task), timeout) + except TimeoutError: + return None + + async def get_manifest(self): + return self.manifest + + +@pytest_asyncio.fixture +async def dummy_manager(): + manifest = { + ExecutorClass.always_on__gpu_24gb: 2, + } + manager = DummyExecutorManager(manifest, runtime_offset=-2) + yield manager + for pool in manager._executor_class_pools.values(): + pool._pool_cleanup_task.cancel() + try: + await pool._pool_cleanup_task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_executor_class_pool(dummy_manager): + # Test reserving executors + pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + + executor1 = await pool.reserve_executor("token1", 2) + assert isinstance(executor1, DummyExecutor) + assert pool.get_availability() == 1 + + executor2 = await pool.reserve_executor("token2", 7) + assert isinstance(executor2, DummyExecutor) + assert pool.get_availability() == 0 + + # Test ExecutorUnavailable exception + with pytest.raises(ExecutorUnavailable): + await pool.reserve_executor("token3", 20) + + # Test executor completion + status = await dummy_manager.wait_for_executor(executor1, 1) + assert status == "Completed" + status = await dummy_manager.wait_for_executor(executor2, 1) + assert status is None + + # Allow time for pool cleanup + await asyncio.sleep(1) + assert pool.get_availability() == 1 + + # Test executor completion + status = await dummy_manager.wait_for_executor(executor2, 7) + assert status == "Completed" + + # Allow time for pool cleanup + await asyncio.sleep(1) + assert pool.get_availability() == 2 + + # Test long-running executor + async with dummy_manager.set_runtime_offset(5): + long_running_executor = await pool.reserve_executor("token4", 5) + + # Wait a bit, but not long enough for the executor to complete + status = await dummy_manager.wait_for_executor(long_running_executor, 2) + assert status is None + + # Wait for the executor to be killed by the cleanup process + status = await dummy_manager.wait_for_executor(long_running_executor, 10) + assert status == "Cancelled" + + # Allow time for pool cleanup + await asyncio.sleep(1) + assert pool.get_availability() == 2 + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_manager_reserve_executor_class(dummy_manager): + await dummy_manager.reserve_executor_class("token1", ExecutorClass.always_on__gpu_24gb, 10) + assert ( + await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + ).get_availability() == 1 + + await dummy_manager.reserve_executor_class("token2", ExecutorClass.always_on__gpu_24gb, 10) + assert ( + await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + ).get_availability() == 0 + + with pytest.raises(ExecutorUnavailable): + await dummy_manager.reserve_executor_class("token3", ExecutorClass.always_on__gpu_24gb, 10) + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_manifest_update(dummy_manager): + pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + assert pool._count == 2 + + # Update manifest + dummy_manager.manifest = {ExecutorClass.always_on__gpu_24gb: 3} + + # Get pool again to trigger update + pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb) + assert pool._count == 3 + + +@pytest.mark.asyncio +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT", + 0, +) +@patch( + "compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD", + 0.1, +) +async def test_concurrent_reservations(dummy_manager): + async def reserve(i): + try: + await dummy_manager.reserve_executor_class( + f"token{i}", ExecutorClass.always_on__gpu_24gb, 5 + ) + return True + except ExecutorUnavailable: + return False + + results = await asyncio.gather(*[reserve(i) for i in range(5)]) + assert results.count(True) == 2 + assert results.count(False) == 3 diff --git a/miner/bin/rotate-local-backups.py b/miner/bin/rotate-local-backups.py index 44eff24c6..57f47801f 100755 --- a/miner/bin/rotate-local-backups.py +++ b/miner/bin/rotate-local-backups.py @@ -18,7 +18,7 @@ def rotate_backups(path, file_count): files = files[:-file_count] if files: print(f"Removing {len(files)} old files") - for mtime, f in files: + for _mtime, f in files: f.unlink() else: print("No old files to remove") diff --git a/miner/pyproject.toml b/miner/pyproject.toml index 181621c72..359e55393 100644 --- a/miner/pyproject.toml +++ b/miner/pyproject.toml @@ -70,17 +70,18 @@ line-length = 100 [tool.ruff.lint] # TODO add D -select = ["E", "F", "I", "UP"] +select = ["E", "F", "I", "UP", "B"] # TODO: remove E501 once docstrings are formatted ignore = [ "D100", "D105", "D107", "D200", "D202", "D203", "D205", "D212", "D400", "D401", "D415", "D101", "D102","D103", "D104", # TODO remove once we have docstring for all public methods "E501", # TODO: remove E501 once docstrings are formatted + "B027", "B904", "B905", ] [tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401"] -"test/**" = ["D", "F403", "F405"] +"**/tests/**" = ["D", "F403", "F405", "B018"] [tool.codespell] skip = '*.min.js,*.lock,*/monitoring_certs/*' diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py index b9a12ebdc..d9ccf1baf 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/facilitator_client.py @@ -41,12 +41,14 @@ def __init__(self, reason: str, errors: list[Error]): self.errors = errors -async def save_facilitator_event(subtype: str, long_description: str, data={}, success=False): +async def save_facilitator_event( + subtype: str, long_description: str, data: dict[str, str] | None = None, success=False +): await SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).acreate( type=SystemEvent.EventType.FACILITATOR_CLIENT_ERROR, subtype=subtype, long_description=long_description, - data=data, + data=data or {}, ) diff --git a/validator/app/src/compute_horde_validator/validator/tests/helpers.py b/validator/app/src/compute_horde_validator/validator/tests/helpers.py index 2bb5429d6..b222779e2 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/helpers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/helpers.py @@ -208,11 +208,7 @@ def __init__( mocked_commit_weights=lambda: (True, ""), mocked_reveal_weights=lambda: (True, ""), mocked_metagraph=lambda: MockMetagraph(), - hyperparameters=MockHyperparameters( - commit_reveal_weights_enabled=False, - commit_reveal_weights_interval=1000, - max_weight_limit=65535, - ), + hyperparameters=None, block_duration=timedelta(seconds=1), override_block_number=None, increase_block_number_with_each_call=False, @@ -222,7 +218,11 @@ def __init__( self.mocked_commit_weights = mocked_commit_weights self.mocked_reveal_weights = mocked_reveal_weights self.mocked_metagraph = mocked_metagraph - self.hyperparameters = hyperparameters + self.hyperparameters = hyperparameters or MockHyperparameters( + commit_reveal_weights_enabled=False, + commit_reveal_weights_interval=1000, + max_weight_limit=65535, + ) self.weights_set: list[list[numbers.Number]] = [] self.weights_committed: list[list[numbers.Number]] = [] self.weights_revealed: list[list[numbers.Number]] = [] diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_commands.py b/validator/app/src/compute_horde_validator/validator/tests/test_commands.py index ffd7bc490..5de274d70 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_commands.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_commands.py @@ -85,7 +85,7 @@ def test_debug_run_organic_job_command__job_not_created(): Miner.objects.create(hotkey="miner_client") with redirect_stdout(io.StringIO()) as buf: - with pytest.raises(BaseException): + with pytest.raises(SystemExit): management.call_command( "debug_run_organic_job", docker_image="noop", timeout=4, cmd_args="" ) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py b/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py index 149deb07e..9b7175837 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_facilitator_client.py @@ -150,7 +150,7 @@ async def test_facilitator_client(ws_server_cls): facilitator_client.specs_task.cancel() task.cancel() if ws_server.facilitator_error: - assert False, ws_server.facilitator_error + pytest.fail(str(ws_server.facilitator_error)) @pytest.fixture diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_utils.py b/validator/app/src/compute_horde_validator/validator/tests/test_utils.py index 33fa9da13..2df8d30cb 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_utils.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_utils.py @@ -469,7 +469,7 @@ async def interaction_callback(miner_client, after_job_sent): miner_client = mocked_synthetic_miner_client.instance async for job in SyntheticJob.objects.filter(job_uuid__in=job_uuids): receipt = miner_client._query_sent_models( - lambda m: m.payload.job_uuid == str(job.job_uuid), V0JobFinishedReceiptRequest + lambda m, j=job: m.payload.job_uuid == str(j.job_uuid), V0JobFinishedReceiptRequest )[0] time_took = receipt.payload.time_took_us / 1_000_000 assert abs(job.score * time_took - expected_multiplier) < 0.0001 @@ -556,7 +556,7 @@ async def interaction_callback(miner_client, after_job_sent): miner_client = mocked_synthetic_miner_client.instance for job in SyntheticJob.objects.filter(job_uuid__in=job_uuids): receipt = miner_client._query_sent_models( - lambda m: m.payload.job_uuid == str(job.job_uuid), V0JobFinishedReceiptRequest + lambda m, j=job: m.payload.job_uuid == str(j.job_uuid), V0JobFinishedReceiptRequest )[0] time_took = receipt.payload.time_took_us / 1_000_000 assert abs(job.score * time_took - expected_multiplier) < 0.0001 diff --git a/validator/bin/rotate-local-backups.py b/validator/bin/rotate-local-backups.py index 44eff24c6..57f47801f 100755 --- a/validator/bin/rotate-local-backups.py +++ b/validator/bin/rotate-local-backups.py @@ -18,7 +18,7 @@ def rotate_backups(path, file_count): files = files[:-file_count] if files: print(f"Removing {len(files)} old files") - for mtime, f in files: + for _mtime, f in files: f.unlink() else: print("No old files to remove") diff --git a/validator/pyproject.toml b/validator/pyproject.toml index d879e3427..551830f29 100644 --- a/validator/pyproject.toml +++ b/validator/pyproject.toml @@ -78,7 +78,7 @@ line-length = 100 [tool.ruff.lint] # TODO add D select = [ - "E", "F", "I", "UP", + "E", "F", "I", "UP", "B", "TCH005", ] # TODO: remove E501 once docstrings are formatted @@ -86,11 +86,12 @@ ignore = [ "D100", "D105", "D107", "D200", "D202", "D203", "D205", "D212", "D400", "D401", "D415", "D101", "D102","D103", "D104", # TODO remove once we have docstring for all public methods "E501", # TODO: remove E501 once docstrings are formatted + "B027", "B904", "B905", ] [tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401"] -"test/**" = ["D", "F403", "F405"] +"**/tests/**" = ["D", "F403", "F405", "B018"] [tool.codespell] skip = '*.min.js,*.lock,*/monitoring_certs/*'