Skip to content

Commit

Permalink
Merge pull request #2246 from fractal-analytics-platform/executor-cle…
Browse files Browse the repository at this point in the history
…anup

Review sudo/SLURM executor and its tests
  • Loading branch information
tcompa authored Feb 4, 2025
2 parents 6b39b31 + 71b741c commit e71d69a
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 165 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
* Sudo/SLURM executor checks the fractal-server version using `FRACTAL_SLURM_WORKER_PYTHON` config variable, if set (\#2240).
* Handle `_COMPONENT_KEY_`-related errors in sudo/SLURM executor, to simplify testing (\#2245).
* Drop obsolete `SlurmJob.workflow_task_file_prefix` for both SSH/sudo executors (\#2245).
* Drop obsolete `keep_pickle_files` attribute from slurm executors (\#2246).
* Dependencies:
* Bump `uvicorn` version (\#2242).
* Testing:
* Add tests for `scancel` during execution (\#2245).
* Improve testing of sudo-Slurm executor (\#2245, \#2246).

# 2.12.0

Expand Down
31 changes: 10 additions & 21 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class FractalSlurmSSHExecutor(SlurmExecutor):
shutdown_file:
python_remote: Equal to `settings.FRACTAL_SLURM_WORKER_PYTHON`
wait_thread_cls: Class for waiting thread
keep_pickle_files:
workflow_dir_local:
Directory for both the cfut/SLURM and fractal-server files and logs
workflow_dir_remote:
Expand All @@ -84,7 +83,6 @@ class FractalSlurmSSHExecutor(SlurmExecutor):
python_remote: str

wait_thread_cls = FractalSlurmWaitThread
keep_pickle_files: bool

common_script_lines: list[str]
slurm_account: Optional[str]
Expand All @@ -100,8 +98,6 @@ def __init__(
# Folders and files
workflow_dir_local: Path,
workflow_dir_remote: Path,
# Runner options
keep_pickle_files: bool = False,
# Monitoring options
slurm_poll_interval: Optional[int] = None,
# SLURM submission script options
Expand All @@ -120,7 +116,6 @@ def __init__(
fractal_ssh:
workflow_dir_local:
workflow_dir_remote:
keep_pickle_files:
slurm_poll_interval:
common_script_lines:
slurm_account:
Expand Down Expand Up @@ -194,7 +189,6 @@ def __init__(
raise e

# Set/initialize some more options
self.keep_pickle_files = keep_pickle_files
self.map_jobid_to_slurm_files_local = {}

def _validate_common_script_lines(self):
Expand Down Expand Up @@ -901,12 +895,11 @@ def _handle_remaining_jobs(
pass
for job_id in remaining_job_ids:
self._cleanup(job_id)
if not self.keep_pickle_files:
for job in remaining_jobs:
for path in job.output_pickle_files_local:
path.unlink()
for path in job.input_pickle_files_local:
path.unlink()
for job in remaining_jobs:
for path in job.output_pickle_files_local:
path.unlink()
for path in job.input_pickle_files_local:
path.unlink()

def _completion(self, job_ids: list[str]) -> None:
"""
Expand Down Expand Up @@ -1001,8 +994,7 @@ def _completion(self, job_ids: list[str]) -> None:
f"Future {future} (SLURM job ID: {job_id}) "
"was already cancelled."
)
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(job_id)
self._handle_remaining_jobs(
remaining_futures=remaining_futures,
Expand Down Expand Up @@ -1062,17 +1054,15 @@ def _completion(self, job_ids: list[str]) -> None:
remaining_job_ids=remaining_job_ids,
)
return
if not self.keep_pickle_files:
out_path.unlink()
out_path.unlink()
except InvalidStateError:
logger.warning(
f"Future {future} (SLURM job ID: {job_id}) was "
"already cancelled, exit from "
"FractalSlurmSSHExecutor._completion."
)
if not self.keep_pickle_files:
out_path.unlink()
in_path.unlink()
out_path.unlink()
in_path.unlink()

self._cleanup(job_id)
self._handle_remaining_jobs(
Expand All @@ -1082,8 +1072,7 @@ def _completion(self, job_ids: list[str]) -> None:
return

# Clean up input pickle file
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(job_id)
if job.single_task_submission:
future.set_result(outputs[0])
Expand Down
17 changes: 5 additions & 12 deletions fractal_server/app/runner/executors/slurm/sudo/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ class FractalSlurmExecutor(SlurmExecutor):
workflow_dir_local: Path
workflow_dir_remote: Path
map_jobid_to_slurm_files: dict[str, tuple[str, str, str]]
keep_pickle_files: bool
slurm_account: Optional[str]
jobs: dict[str, tuple[Future, SlurmJob]]

Expand All @@ -232,7 +231,6 @@ def __init__(
user_cache_dir: Optional[str] = None,
common_script_lines: Optional[list[str]] = None,
slurm_poll_interval: Optional[int] = None,
keep_pickle_files: bool = False,
slurm_account: Optional[str] = None,
*args,
**kwargs,
Expand All @@ -253,7 +251,6 @@ def __init__(
# raised within `__init__`).
self.wait_thread.shutdown_callback = self.shutdown

self.keep_pickle_files = keep_pickle_files
self.slurm_user = slurm_user
self.slurm_account = slurm_account

Expand Down Expand Up @@ -874,8 +871,7 @@ def _completion(self, jobid: str) -> None:
" cancelled, exit from"
" FractalSlurmExecutor._completion."
)
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(jobid)
return

Expand Down Expand Up @@ -917,23 +913,20 @@ def _completion(self, jobid: str) -> None:
exc = TaskExecutionError(proxy.tb, **kwargs)
fut.set_exception(exc)
return
if not self.keep_pickle_files:
out_path.unlink()
out_path.unlink()
except InvalidStateError:
logger.warning(
f"Future {fut} (SLURM job ID: {jobid}) was already"
" cancelled, exit from"
" FractalSlurmExecutor._completion."
)
if not self.keep_pickle_files:
out_path.unlink()
in_path.unlink()
out_path.unlink()
in_path.unlink()
self._cleanup(jobid)
return

# Clean up input pickle file
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(jobid)
if job.single_task_submission:
fut.set_result(outputs[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,104 +5,19 @@
from pathlib import Path

import pytest
from devtools import debug

from fractal_server.app.runner.exceptions import JobExecutionError
from fractal_server.app.runner.executors.slurm.sudo._subprocess_run_as_user import ( # noqa: E501
_mkdir_as_user,
)
from fractal_server.app.runner.executors.slurm.sudo.executor import (
FractalSlurmExecutor,
) # noqa
from fractal_server.logger import set_logger
)
from tests.fixtures_slurm import run_squeue
from tests.fixtures_slurm import SLURM_USER
from tests.v2._aux_runner import get_default_slurm_config
from tests.v2._aux_runner import get_default_task_files

logger = set_logger(__file__)


def test_slurm_sudo_executor_shutdown_before_job_submission(
tmp_path: Path,
override_settings_factory,
current_py_version: str,
):
"""
Verify the behavior when shutdown is called before any job has started.
"""

override_settings_factory(FRACTAL_SLURM_WORKER_PYTHON=None)

with FractalSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir1",
workflow_dir_remote=tmp_path / "remote_job_dir1",
slurm_user="TEST",
slurm_poll_interval=1,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
fut = executor.submit(
lambda: 1,
slurm_config=get_default_slurm_config(),
task_files=get_default_task_files(
workflow_dir_local=executor.workflow_dir_local,
workflow_dir_remote=executor.workflow_dir_remote,
),
)
fut.result()
debug(exc_info.value)

with FractalSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir1",
workflow_dir_remote=tmp_path / "remote_job_dir1",
slurm_user="TEST",
slurm_poll_interval=1,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
fut = executor.map(
lambda x: 1,
[1, 2, 3],
slurm_config=get_default_slurm_config(),
task_files=get_default_task_files(
workflow_dir_local=executor.workflow_dir_local,
workflow_dir_remote=executor.workflow_dir_remote,
),
)
fut.result()
debug(exc_info.value)

with FractalSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir1",
workflow_dir_remote=tmp_path / "remote_job_dir1",
slurm_user="TEST",
slurm_poll_interval=1,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
executor.wait_thread.wait(filenames=("some", "thing"), jobid=1)
debug(exc_info.value)

with FractalSlurmExecutor(
workflow_dir_local=tmp_path / "job_dir1",
workflow_dir_remote=tmp_path / "remote_job_dir1",
slurm_user="TEST",
slurm_poll_interval=1,
) as executor:
executor.shutdown()
with pytest.raises(JobExecutionError) as exc_info:
executor._submit_job(
lambda x: x,
slurm_file_prefix="test",
task_files=get_default_task_files(
workflow_dir_local=executor.workflow_dir_local,
workflow_dir_remote=executor.workflow_dir_remote,
),
slurm_config=get_default_slurm_config(),
)
debug(exc_info.value)


async def test_scancel_during_execution(
tmp777_path: Path, monkey_slurm, slurm_working_folders
Expand Down
Loading

0 comments on commit e71d69a

Please sign in to comment.