diff --git a/CHANGELOG.md b/CHANGELOG.md index e7efda5f07..7db6356115 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,11 @@ * Sudo/SLURM executor checks the fractal-server version using `FRACTAL_SLURM_WORKER_PYTHON` config variable, if set (\#2240). * Handle `_COMPONENT_KEY_`-related errors in sudo/SLURM executor, to simplify testing (\#2245). * Drop obsolete `SlurmJob.workflow_task_file_prefix` for both SSH/sudo executors (\#2245). + * Drop obsolete `keep_pickle_files` attribute from slurm executors (\#2246). * Dependencies: * Bump `uvicorn` version (\#2242). * Testing: - * Add tests for `scancel` during execution (\#2245). + * Improve testing of sudo-Slurm executor (\#2245, \#2246). # 2.12.0 diff --git a/fractal_server/app/runner/executors/slurm/ssh/executor.py b/fractal_server/app/runner/executors/slurm/ssh/executor.py index a99dd9d859..89ef4f4e2b 100644 --- a/fractal_server/app/runner/executors/slurm/ssh/executor.py +++ b/fractal_server/app/runner/executors/slurm/ssh/executor.py @@ -62,7 +62,6 @@ class FractalSlurmSSHExecutor(SlurmExecutor): shutdown_file: python_remote: Equal to `settings.FRACTAL_SLURM_WORKER_PYTHON` wait_thread_cls: Class for waiting thread - keep_pickle_files: workflow_dir_local: Directory for both the cfut/SLURM and fractal-server files and logs workflow_dir_remote: @@ -84,7 +83,6 @@ class FractalSlurmSSHExecutor(SlurmExecutor): python_remote: str wait_thread_cls = FractalSlurmWaitThread - keep_pickle_files: bool common_script_lines: list[str] slurm_account: Optional[str] @@ -100,8 +98,6 @@ def __init__( # Folders and files workflow_dir_local: Path, workflow_dir_remote: Path, - # Runner options - keep_pickle_files: bool = False, # Monitoring options slurm_poll_interval: Optional[int] = None, # SLURM submission script options @@ -120,7 +116,6 @@ def __init__( fractal_ssh: workflow_dir_local: workflow_dir_remote: - keep_pickle_files: slurm_poll_interval: common_script_lines: slurm_account: @@ -194,7 +189,6 @@ def __init__( raise e # Set/initialize some more options - self.keep_pickle_files = keep_pickle_files self.map_jobid_to_slurm_files_local = {} def _validate_common_script_lines(self): @@ -901,12 +895,11 @@ def _handle_remaining_jobs( pass for job_id in remaining_job_ids: self._cleanup(job_id) - if not self.keep_pickle_files: - for job in remaining_jobs: - for path in job.output_pickle_files_local: - path.unlink() - for path in job.input_pickle_files_local: - path.unlink() + for job in remaining_jobs: + for path in job.output_pickle_files_local: + path.unlink() + for path in job.input_pickle_files_local: + path.unlink() def _completion(self, job_ids: list[str]) -> None: """ @@ -1001,8 +994,7 @@ def _completion(self, job_ids: list[str]) -> None: f"Future {future} (SLURM job ID: {job_id}) " "was already cancelled." ) - if not self.keep_pickle_files: - in_path.unlink() + in_path.unlink() self._cleanup(job_id) self._handle_remaining_jobs( remaining_futures=remaining_futures, @@ -1062,17 +1054,15 @@ def _completion(self, job_ids: list[str]) -> None: remaining_job_ids=remaining_job_ids, ) return - if not self.keep_pickle_files: - out_path.unlink() + out_path.unlink() except InvalidStateError: logger.warning( f"Future {future} (SLURM job ID: {job_id}) was " "already cancelled, exit from " "FractalSlurmSSHExecutor._completion." ) - if not self.keep_pickle_files: - out_path.unlink() - in_path.unlink() + out_path.unlink() + in_path.unlink() self._cleanup(job_id) self._handle_remaining_jobs( @@ -1082,8 +1072,7 @@ def _completion(self, job_ids: list[str]) -> None: return # Clean up input pickle file - if not self.keep_pickle_files: - in_path.unlink() + in_path.unlink() self._cleanup(job_id) if job.single_task_submission: future.set_result(outputs[0]) diff --git a/fractal_server/app/runner/executors/slurm/sudo/executor.py b/fractal_server/app/runner/executors/slurm/sudo/executor.py index 3b63aef3ec..34faaa6535 100644 --- a/fractal_server/app/runner/executors/slurm/sudo/executor.py +++ b/fractal_server/app/runner/executors/slurm/sudo/executor.py @@ -219,7 +219,6 @@ class FractalSlurmExecutor(SlurmExecutor): workflow_dir_local: Path workflow_dir_remote: Path map_jobid_to_slurm_files: dict[str, tuple[str, str, str]] - keep_pickle_files: bool slurm_account: Optional[str] jobs: dict[str, tuple[Future, SlurmJob]] @@ -232,7 +231,6 @@ def __init__( user_cache_dir: Optional[str] = None, common_script_lines: Optional[list[str]] = None, slurm_poll_interval: Optional[int] = None, - keep_pickle_files: bool = False, slurm_account: Optional[str] = None, *args, **kwargs, @@ -253,7 +251,6 @@ def __init__( # raised within `__init__`). self.wait_thread.shutdown_callback = self.shutdown - self.keep_pickle_files = keep_pickle_files self.slurm_user = slurm_user self.slurm_account = slurm_account @@ -874,8 +871,7 @@ def _completion(self, jobid: str) -> None: " cancelled, exit from" " FractalSlurmExecutor._completion." ) - if not self.keep_pickle_files: - in_path.unlink() + in_path.unlink() self._cleanup(jobid) return @@ -917,23 +913,20 @@ def _completion(self, jobid: str) -> None: exc = TaskExecutionError(proxy.tb, **kwargs) fut.set_exception(exc) return - if not self.keep_pickle_files: - out_path.unlink() + out_path.unlink() except InvalidStateError: logger.warning( f"Future {fut} (SLURM job ID: {jobid}) was already" " cancelled, exit from" " FractalSlurmExecutor._completion." ) - if not self.keep_pickle_files: - out_path.unlink() - in_path.unlink() + out_path.unlink() + in_path.unlink() self._cleanup(jobid) return # Clean up input pickle file - if not self.keep_pickle_files: - in_path.unlink() + in_path.unlink() self._cleanup(jobid) if job.single_task_submission: fut.set_result(outputs[0]) diff --git a/tests/v2/09_backends/test_unit_slurm_executor.py b/tests/v2/09_backends/test_sudo_slurm_executor_scancel.py similarity index 55% rename from tests/v2/09_backends/test_unit_slurm_executor.py rename to tests/v2/09_backends/test_sudo_slurm_executor_scancel.py index 410e6c3fed..24ec2cda36 100644 --- a/tests/v2/09_backends/test_unit_slurm_executor.py +++ b/tests/v2/09_backends/test_sudo_slurm_executor_scancel.py @@ -5,7 +5,6 @@ from pathlib import Path import pytest -from devtools import debug from fractal_server.app.runner.exceptions import JobExecutionError from fractal_server.app.runner.executors.slurm.sudo._subprocess_run_as_user import ( # noqa: E501 @@ -13,96 +12,12 @@ ) from fractal_server.app.runner.executors.slurm.sudo.executor import ( FractalSlurmExecutor, -) # noqa -from fractal_server.logger import set_logger +) from tests.fixtures_slurm import run_squeue from tests.fixtures_slurm import SLURM_USER from tests.v2._aux_runner import get_default_slurm_config from tests.v2._aux_runner import get_default_task_files -logger = set_logger(__file__) - - -def test_slurm_sudo_executor_shutdown_before_job_submission( - tmp_path: Path, - override_settings_factory, - current_py_version: str, -): - """ - Verify the behavior when shutdown is called before any job has started. - """ - - override_settings_factory(FRACTAL_SLURM_WORKER_PYTHON=None) - - with FractalSlurmExecutor( - workflow_dir_local=tmp_path / "job_dir1", - workflow_dir_remote=tmp_path / "remote_job_dir1", - slurm_user="TEST", - slurm_poll_interval=1, - ) as executor: - executor.shutdown() - with pytest.raises(JobExecutionError) as exc_info: - fut = executor.submit( - lambda: 1, - slurm_config=get_default_slurm_config(), - task_files=get_default_task_files( - workflow_dir_local=executor.workflow_dir_local, - workflow_dir_remote=executor.workflow_dir_remote, - ), - ) - fut.result() - debug(exc_info.value) - - with FractalSlurmExecutor( - workflow_dir_local=tmp_path / "job_dir1", - workflow_dir_remote=tmp_path / "remote_job_dir1", - slurm_user="TEST", - slurm_poll_interval=1, - ) as executor: - executor.shutdown() - with pytest.raises(JobExecutionError) as exc_info: - fut = executor.map( - lambda x: 1, - [1, 2, 3], - slurm_config=get_default_slurm_config(), - task_files=get_default_task_files( - workflow_dir_local=executor.workflow_dir_local, - workflow_dir_remote=executor.workflow_dir_remote, - ), - ) - fut.result() - debug(exc_info.value) - - with FractalSlurmExecutor( - workflow_dir_local=tmp_path / "job_dir1", - workflow_dir_remote=tmp_path / "remote_job_dir1", - slurm_user="TEST", - slurm_poll_interval=1, - ) as executor: - executor.shutdown() - with pytest.raises(JobExecutionError) as exc_info: - executor.wait_thread.wait(filenames=("some", "thing"), jobid=1) - debug(exc_info.value) - - with FractalSlurmExecutor( - workflow_dir_local=tmp_path / "job_dir1", - workflow_dir_remote=tmp_path / "remote_job_dir1", - slurm_user="TEST", - slurm_poll_interval=1, - ) as executor: - executor.shutdown() - with pytest.raises(JobExecutionError) as exc_info: - executor._submit_job( - lambda x: x, - slurm_file_prefix="test", - task_files=get_default_task_files( - workflow_dir_local=executor.workflow_dir_local, - workflow_dir_remote=executor.workflow_dir_remote, - ), - slurm_config=get_default_slurm_config(), - ) - debug(exc_info.value) - async def test_scancel_during_execution( tmp777_path: Path, monkey_slurm, slurm_working_folders diff --git a/tests/v2/09_backends/test_sudo_slurm_executor_unit.py b/tests/v2/09_backends/test_sudo_slurm_executor_unit.py new file mode 100644 index 0000000000..5ee900e843 --- /dev/null +++ b/tests/v2/09_backends/test_sudo_slurm_executor_unit.py @@ -0,0 +1,205 @@ +import json +from pathlib import Path + +import pytest +from devtools import debug + +from fractal_server.app.runner.exceptions import JobExecutionError +from fractal_server.app.runner.executors.slurm.sudo.executor import ( + FractalSlurmExecutor, +) +from fractal_server.app.runner.executors.slurm.sudo.executor import SlurmJob +from tests.v2._aux_runner import get_default_slurm_config +from tests.v2._aux_runner import get_default_task_files + + +def test_slurm_sudo_executor_shutdown_before_job_submission( + tmp_path: Path, + override_settings_factory, + current_py_version: str, +): + """ + Verify the behavior when shutdown is called before any job has started. + """ + + override_settings_factory(FRACTAL_SLURM_WORKER_PYTHON=None) + + with FractalSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + slurm_user="TEST", + slurm_poll_interval=1, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + fut = executor.submit( + lambda: 1, + slurm_config=get_default_slurm_config(), + task_files=get_default_task_files( + workflow_dir_local=executor.workflow_dir_local, + workflow_dir_remote=executor.workflow_dir_remote, + ), + ) + fut.result() + debug(exc_info.value) + + with FractalSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + slurm_user="TEST", + slurm_poll_interval=1, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + fut = executor.map( + lambda x: 1, + [1, 2, 3], + slurm_config=get_default_slurm_config(), + task_files=get_default_task_files( + workflow_dir_local=executor.workflow_dir_local, + workflow_dir_remote=executor.workflow_dir_remote, + ), + ) + fut.result() + debug(exc_info.value) + + with FractalSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + slurm_user="TEST", + slurm_poll_interval=1, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + executor.wait_thread.wait(filenames=("some", "thing"), jobid=1) + debug(exc_info.value) + + with FractalSlurmExecutor( + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + slurm_user="TEST", + slurm_poll_interval=1, + ) as executor: + executor.shutdown() + with pytest.raises(JobExecutionError) as exc_info: + executor._submit_job( + lambda x: x, + slurm_file_prefix="test", + task_files=get_default_task_files( + workflow_dir_local=executor.workflow_dir_local, + workflow_dir_remote=executor.workflow_dir_remote, + ), + slurm_config=get_default_slurm_config(), + ) + debug(exc_info.value) + + +def test_check_remote_runner_python_interpreter( + monkeypatch, + override_settings_factory, +): + override_settings_factory(FRACTAL_SLURM_WORKER_PYTHON="/remote/python") + + with pytest.raises( + RuntimeError, match="No such file or directory: '/remote/python'" + ): + FractalSlurmExecutor( + slurm_user="test_user", + workflow_dir_local=Path("/local/workflow"), + workflow_dir_remote=Path("/remote/workflow"), + ) + + def mock_subprocess_run_or_raise(cmd): + class MockCompletedProcess(object): + stdout: str = json.dumps({"fractal_server": "9.9.9"}) + + return MockCompletedProcess() + + monkeypatch.setattr( + ( + "fractal_server.app.runner.executors.slurm.sudo.executor" + "._subprocess_run_or_raise" + ), + mock_subprocess_run_or_raise, + ) + + with pytest.raises(RuntimeError, match="Fractal-server version mismatch"): + FractalSlurmExecutor( + slurm_user="test_user", + workflow_dir_local=Path("/local/workflow"), + workflow_dir_remote=Path("/remote/workflow"), + ) + + +def test_SlurmJob(): + job = SlurmJob( + single_task_submission=False, + num_tasks_tot=2, + wftask_file_prefixes=("0", "1"), + slurm_config=get_default_slurm_config(), + slurm_file_prefix="prefix", + ) + assert job.wftask_file_prefixes == ("0", "1") + + job = SlurmJob( + single_task_submission=False, + num_tasks_tot=2, + wftask_file_prefixes=None, + slurm_config=get_default_slurm_config(), + slurm_file_prefix="prefix", + ) + assert job.wftask_file_prefixes == ( + "default_wftask_prefix", + "default_wftask_prefix", + ) + + with pytest.raises(ValueError, match="Trying to initialize"): + SlurmJob( + single_task_submission=True, + num_tasks_tot=2, + wftask_file_prefixes=("0", "1"), + slurm_config=get_default_slurm_config(), + slurm_file_prefix="prefix", + ) + + +def test_FractalSlurmExecutor_init( + tmp_path, + override_settings_factory, +): + + override_settings_factory(FRACTAL_SLURM_WORKER_PYTHON=None) + + with pytest.raises( + RuntimeError, + match="Missing attribute FractalSlurmExecutor.slurm_user", + ): + with FractalSlurmExecutor( + slurm_user=None, + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + ): + pass + + with pytest.raises( + RuntimeError, + match="Missing attribute FractalSlurmExecutor.slurm_user", + ): + with FractalSlurmExecutor( + slurm_user="", + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + ): + pass + + with pytest.raises( + RuntimeError, + match="SLURM account must be set via the request body", + ): + with FractalSlurmExecutor( + slurm_user="something", + workflow_dir_local=tmp_path / "job_dir1", + workflow_dir_remote=tmp_path / "remote_job_dir1", + common_script_lines=["#SBATCH --account=myaccount"], + ): + pass diff --git a/tests/v2/09_backends/test_unit_sudo_slurm_executor.py b/tests/v2/09_backends/test_unit_sudo_slurm_executor.py deleted file mode 100644 index d5efd4fa2c..0000000000 --- a/tests/v2/09_backends/test_unit_sudo_slurm_executor.py +++ /dev/null @@ -1,45 +0,0 @@ -import json -from pathlib import Path - -import pytest - -from fractal_server.app.runner.executors.slurm.sudo.executor import ( - FractalSlurmExecutor, -) - - -def test_check_remote_runner_python_interpreter( - monkeypatch, override_settings_factory -): - remote_version = "1.0.0" - override_settings_factory(FRACTAL_SLURM_WORKER_PYTHON="/remote/python") - - def mock_subprocess_run_or_raise(cmd): - class MockCompletedProcess(object): - stdout: str = json.dumps({"fractal_server": remote_version}) - - return MockCompletedProcess() - - with pytest.raises( - RuntimeError, match="No such file or directory: '/remote/python'" - ): - FractalSlurmExecutor( - slurm_user="test_user", - workflow_dir_local=Path("/local/workflow"), - workflow_dir_remote=Path("/remote/workflow"), - ) - - monkeypatch.setattr( - ( - "fractal_server.app.runner.executors.slurm.sudo.executor" - "._subprocess_run_or_raise" - ), - mock_subprocess_run_or_raise, - ) - - with pytest.raises(RuntimeError, match="Fractal-server version mismatch"): - FractalSlurmExecutor( - slurm_user="test_user", - workflow_dir_local=Path("/local/workflow"), - workflow_dir_remote=Path("/remote/workflow"), - )