Skip to content

Commit

Permalink
Merge branch 'main' into 2224-review-email-configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
tcompa committed Feb 4, 2025
2 parents f166355 + e71d69a commit fc36b13
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 355 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
* Remove `run_migrations_offline` from `env.py` and make `run_migrations_online` sync (\#2239).
* Runner
* Sudo/SLURM executor checks the fractal-server version using `FRACTAL_SLURM_WORKER_PYTHON` config variable, if set (\#2240).
* Handle `_COMPONENT_KEY_`-related errors in sudo/SLURM executor, to simplify testing (\#2245).
* Drop obsolete `SlurmJob.workflow_task_file_prefix` for both SSH/sudo executors (\#2245).
* Drop obsolete `keep_pickle_files` attribute from slurm executors (\#2246).
* Dependencies:
* Bump `uvicorn` version (\#2242).
* Testing:
* Improve testing of sudo-Slurm executor (\#2245, \#2246).

# 2.12.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def __init__(
self,
num_tasks_tot: int,
slurm_config: SlurmConfig,
workflow_task_file_prefix: Optional[str] = None,
slurm_file_prefix: Optional[str] = None,
wftask_file_prefixes: Optional[tuple[str, ...]] = None,
single_task_submission: bool = False,
Expand Down
31 changes: 10 additions & 21 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class FractalSlurmSSHExecutor(SlurmExecutor):
shutdown_file:
python_remote: Equal to `settings.FRACTAL_SLURM_WORKER_PYTHON`
wait_thread_cls: Class for waiting thread
keep_pickle_files:
workflow_dir_local:
Directory for both the cfut/SLURM and fractal-server files and logs
workflow_dir_remote:
Expand All @@ -84,7 +83,6 @@ class FractalSlurmSSHExecutor(SlurmExecutor):
python_remote: str

wait_thread_cls = FractalSlurmWaitThread
keep_pickle_files: bool

common_script_lines: list[str]
slurm_account: Optional[str]
Expand All @@ -100,8 +98,6 @@ def __init__(
# Folders and files
workflow_dir_local: Path,
workflow_dir_remote: Path,
# Runner options
keep_pickle_files: bool = False,
# Monitoring options
slurm_poll_interval: Optional[int] = None,
# SLURM submission script options
Expand All @@ -120,7 +116,6 @@ def __init__(
fractal_ssh:
workflow_dir_local:
workflow_dir_remote:
keep_pickle_files:
slurm_poll_interval:
common_script_lines:
slurm_account:
Expand Down Expand Up @@ -194,7 +189,6 @@ def __init__(
raise e

# Set/initialize some more options
self.keep_pickle_files = keep_pickle_files
self.map_jobid_to_slurm_files_local = {}

def _validate_common_script_lines(self):
Expand Down Expand Up @@ -901,12 +895,11 @@ def _handle_remaining_jobs(
pass
for job_id in remaining_job_ids:
self._cleanup(job_id)
if not self.keep_pickle_files:
for job in remaining_jobs:
for path in job.output_pickle_files_local:
path.unlink()
for path in job.input_pickle_files_local:
path.unlink()
for job in remaining_jobs:
for path in job.output_pickle_files_local:
path.unlink()
for path in job.input_pickle_files_local:
path.unlink()

def _completion(self, job_ids: list[str]) -> None:
"""
Expand Down Expand Up @@ -1001,8 +994,7 @@ def _completion(self, job_ids: list[str]) -> None:
f"Future {future} (SLURM job ID: {job_id}) "
"was already cancelled."
)
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(job_id)
self._handle_remaining_jobs(
remaining_futures=remaining_futures,
Expand Down Expand Up @@ -1062,17 +1054,15 @@ def _completion(self, job_ids: list[str]) -> None:
remaining_job_ids=remaining_job_ids,
)
return
if not self.keep_pickle_files:
out_path.unlink()
out_path.unlink()
except InvalidStateError:
logger.warning(
f"Future {future} (SLURM job ID: {job_id}) was "
"already cancelled, exit from "
"FractalSlurmSSHExecutor._completion."
)
if not self.keep_pickle_files:
out_path.unlink()
in_path.unlink()
out_path.unlink()
in_path.unlink()

self._cleanup(job_id)
self._handle_remaining_jobs(
Expand All @@ -1082,8 +1072,7 @@ def _completion(self, job_ids: list[str]) -> None:
return

# Clean up input pickle file
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(job_id)
if job.single_task_submission:
future.set_result(outputs[0])
Expand Down
27 changes: 13 additions & 14 deletions fractal_server/app/runner/executors/slurm/sudo/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def __init__(
self,
num_tasks_tot: int,
slurm_config: SlurmConfig,
workflow_task_file_prefix: Optional[str] = None,
slurm_file_prefix: Optional[str] = None,
wftask_file_prefixes: Optional[tuple[str, ...]] = None,
single_task_submission: bool = False,
Expand Down Expand Up @@ -220,7 +219,6 @@ class FractalSlurmExecutor(SlurmExecutor):
workflow_dir_local: Path
workflow_dir_remote: Path
map_jobid_to_slurm_files: dict[str, tuple[str, str, str]]
keep_pickle_files: bool
slurm_account: Optional[str]
jobs: dict[str, tuple[Future, SlurmJob]]

Expand All @@ -233,7 +231,6 @@ def __init__(
user_cache_dir: Optional[str] = None,
common_script_lines: Optional[list[str]] = None,
slurm_poll_interval: Optional[int] = None,
keep_pickle_files: bool = False,
slurm_account: Optional[str] = None,
*args,
**kwargs,
Expand All @@ -254,7 +251,6 @@ def __init__(
# raised within `__init__`).
self.wait_thread.shutdown_callback = self.shutdown

self.keep_pickle_files = keep_pickle_files
self.slurm_user = slurm_user
self.slurm_account = slurm_account

Expand Down Expand Up @@ -616,7 +612,14 @@ def _submit_job(
_prefixes = []
_subfolder_names = []
for component in components:
actual_component = component.get(_COMPONENT_KEY_, None)
# In Fractal, `component` is a `dict` by construction (e.g.
# `component = {"zarr_url": "/something", "param": 1}``). The
# try/except covers the case of e.g. `executor.map([1, 2])`,
# which is useful for testing.
try:
actual_component = component.get(_COMPONENT_KEY_, None)
except AttributeError:
actual_component = str(component)
_task_file_paths = get_task_file_paths(
workflow_dir_local=task_files.workflow_dir_local,
workflow_dir_remote=task_files.workflow_dir_remote,
Expand Down Expand Up @@ -868,8 +871,7 @@ def _completion(self, jobid: str) -> None:
" cancelled, exit from"
" FractalSlurmExecutor._completion."
)
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(jobid)
return

Expand Down Expand Up @@ -911,23 +913,20 @@ def _completion(self, jobid: str) -> None:
exc = TaskExecutionError(proxy.tb, **kwargs)
fut.set_exception(exc)
return
if not self.keep_pickle_files:
out_path.unlink()
out_path.unlink()
except InvalidStateError:
logger.warning(
f"Future {fut} (SLURM job ID: {jobid}) was already"
" cancelled, exit from"
" FractalSlurmExecutor._completion."
)
if not self.keep_pickle_files:
out_path.unlink()
in_path.unlink()
out_path.unlink()
in_path.unlink()
self._cleanup(jobid)
return

# Clean up input pickle file
if not self.keep_pickle_files:
in_path.unlink()
in_path.unlink()
self._cleanup(jobid)
if job.single_task_submission:
fut.set_result(outputs[0])
Expand Down
44 changes: 21 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit fc36b13

Please sign in to comment.