From eeb7ff4a197ea4ce850db2e07f2f5b33819d1796 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20D=C3=A9fossez?= Date: Tue, 23 May 2023 16:26:40 +0200 Subject: [PATCH 1/7] WIP --- dora/conf.py | 7 +++++-- dora/shep.py | 56 ++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/dora/conf.py b/dora/conf.py index ab0611b..5da675c 100644 --- a/dora/conf.py +++ b/dora/conf.py @@ -70,8 +70,10 @@ class SlurmConfig: per node, otherwise, will schedule one task per gpu (default is False). array_parallelism (int): when using job arrays, how many tasks can run in parallel. - qos: (str or None): qos param for slurm. - account: (str or None): account param for slurm. + qos (str or None): qos param for slurm. + account (str or None): account param for slurm. + dependents (int): if > 0, start a number of dependent jobs. Requeuing + will be deactivated and rely on dependent jobs instead. ..warning:: this assumes one task per GPU. Set `one_task_per_node` if you do not want that. @@ -92,6 +94,7 @@ class SlurmConfig: exclude: tp.Optional[str] = None qos: tp.Optional[str] = None account: tp.Optional[str] = None + dependents: int = 0 @dataclass diff --git a/dora/shep.py b/dora/shep.py index 25ba19f..fe5a65c 100644 --- a/dora/shep.py +++ b/dora/shep.py @@ -31,9 +31,18 @@ logger = logging.getLogger(__name__) +PreemptionCallback = tp.Callable[[], None] +_preemption_callbacks: tp.List[PreemptionCallback] = [] + + +def register_preemption_callaback(callback: PreemptionCallback): + _preemption_callbacks.append(callback) + + class _SubmitItTarget: - def __call__(self, main: DecoratedMain, argv: tp.Sequence[str]): + def __call__(self, main: DecoratedMain, argv: tp.Sequence[str], requeue: bool = True): self.xp = main.get_xp(argv) + self.requeue = requeue spec = get_distrib_spec() # We export the RANK as it can be used to customize logging early on # in the called program (e.g. using Hydra). @@ -42,6 +51,12 @@ def __call__(self, main: DecoratedMain, argv: tp.Sequence[str]): main() def checkpoint(self, *args, **kwargs): + for callback in _preemption_callbacks: + callback() + + if not self.requeue: + return + if get_distrib_spec().rank == 0: # cleanup rendezvous file on requeue, otherwise things will fail. if self.xp.rendezvous_file.exists(): @@ -59,10 +74,16 @@ def __init__(self, xp: XP): self.job: tp.Optional[submitit.SlurmJob] = None # Other jobs contain the list of other jobs in the array self._other_jobs: tp.Optional[tp.List[submitit.SlurmJob]] = None + self._dependent_jobs: tp.List[submitit.SlurmJob] = [] if self._job_file.exists(): content = try_load(self._job_file) if isinstance(content, tuple): - self.job, self._other_jobs = content + if len(content) == 2: + self.job, self._other_jobs = content + elif len(content) == 3: + self.job, self._other_jobs, self._dependent_jobs = content + else: + raise RuntimeError("Invalid content for job file.") else: self.job = content @@ -70,14 +91,15 @@ def __init__(self, xp: XP): def _job_file(self) -> Path: return self.xp.folder / self.xp.dora.shep.job_file - def state(self, mode="standard"): + @staticmethod + def _get_state(job, other_jobs=[], mode="standard"): """Return the current state of the `Sheep`. """ - if self.job is None: + if job is None: return None - state = self.job.watcher.get_state(self.job.job_id, mode) - if state == 'UNKNOWN' and self._other_jobs: - if any(job.state != 'UNKNOWN' for job in self._other_jobs): + state = job.watcher.get_state(job.job_id, mode) + if state == 'UNKNOWN' and other_jobs: + if any(job.state != 'UNKNOWN' for job in other_jobs): # When cancelling single entries in a job array, # sacct will just completely forget about it insted of marking # it as cancelled. So we use a specific 'MISSING' status to handle that. @@ -86,12 +108,24 @@ def state(self, mode="standard"): return 'CANCELLED' return state - def is_done(self, mode="standard"): + @staticmethod + def _is_done(self, job, mode="standard"): """Return True if the job is no longer running on the cluster. """ - if self.job is None: + if job is None: return True - return self.job.watcher.is_done(self.job.job_id, mode) + return job.watcher.is_done(job.job_id, mode) + + def state(self, mode="standard"): + if self._dependent_jobs: + chain = self.job + [self._dependent_jobs] + for job in chain: + state = Sheep._get_state(job, [], mode) + if state == 'COMPLETED' or not Sheep._is_done(job, mode): + return state + return state + else: + return self._get_state(self.job, self._other_jobs, mode) @property def log(self): @@ -335,6 +369,8 @@ def _submit(self, job_array: _JobArray): assert all(other.xp.dora.git_save == use_git_save for other in sheeps), \ "All jobs inside an array must have the same value for git_save.""" + if slurm_config.dependents: + assert not slurm_config.is_array, "Cannot use dependent jobs and job arrays" if is_array: name_sig = _get_sig(sorted([sheep.xp.sig for sheep in sheeps])) else: From 9813fbf91bee43a37c9ac18b5c8c30dc6c99aaf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20D=C3=A9fossez?= Date: Fri, 7 Jul 2023 11:47:49 +0200 Subject: [PATCH 2/7] initial implementation --- dora/grid.py | 10 +++++---- dora/launch.py | 2 +- dora/run.py | 2 +- dora/shep.py | 49 +++++++++++++++++++++++++++++++++-------- dora/tests/test_shep.py | 16 ++++++++++++++ 5 files changed, 64 insertions(+), 15 deletions(-) diff --git a/dora/grid.py b/dora/grid.py index 5c22724..212e226 100644 --- a/dora/grid.py +++ b/dora/grid.py @@ -193,7 +193,7 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str, log("Canceling all current jobs...") for sheep in sheeps: if sheep.job is not None: - shepherd.cancel_lazy(sheep.job) + shepherd.cancel_lazy(sheep=sheep) shepherd.commit() log("Deleting XP folders...") for sheep in sheeps: @@ -215,8 +215,10 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str, jobs = try_load(job_file) if jobs is not None: job = jobs[0] + if len(jobs) == 3: + dependent_jobs = jobs[2] log(f"Canceling job {job.job_id} from unloadable sheep {child.name}.") - shepherd.cancel_lazy(job) + shepherd.cancel_lazy(job, dependent_jobs) else: assert old_sheep is not None old_sheeps.append(old_sheep) @@ -241,7 +243,7 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str, for old_sheep in old_sheeps: if not old_sheep.is_done(): assert old_sheep.job is not None - shepherd.cancel_lazy(old_sheep.job) + shepherd.cancel_lazy(sheep=old_sheep) name = main.get_name(old_sheep.xp) log(f"Canceling job {old_sheep.job.job_id} for no longer required " f"sheep {old_sheep.xp.sig}/{name}") @@ -252,7 +254,7 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str, assert sheep.job is not None name = main.get_name(sheep.xp) log(f"Canceling job {sheep.job.job_id} for sheep {sheep.xp.sig}/{name}") - shepherd.cancel_lazy(sheep.job) + shepherd.cancel_lazy(sheep=sheep) if not args.dry_run: for sheep in sheeps: diff --git a/dora/launch.py b/dora/launch.py index 59a6a4e..5364c07 100644 --- a/dora/launch.py +++ b/dora/launch.py @@ -42,7 +42,7 @@ def launch_action(args, main: DecoratedMain): if args.clear: log("Canceling current job...") if sheep.job is not None: - shepherd.cancel_lazy(sheep.job) + shepherd.cancel_lazy(sheep=sheep) shepherd.commit() log("Deleting XP folder...") if sheep.xp.folder.exists(): diff --git a/dora/run.py b/dora/run.py index cc1d8c4..c3fb934 100644 --- a/dora/run.py +++ b/dora/run.py @@ -34,7 +34,7 @@ def check_job_and_clear(argv: tp.List[str], main: DecoratedMain, clear: bool = F log(red(f"Found existing slurm job {job.job_id} with status {job.state}.")) if clear: log("Cancelling the existing job.") - shepherd.cancel_lazy(sheep.job) + shepherd.cancel_lazy(sheep=sheep) shepherd.commit() time.sleep(3) else: diff --git a/dora/shep.py b/dora/shep.py index fe5a65c..3ce8382 100644 --- a/dora/shep.py +++ b/dora/shep.py @@ -131,15 +131,22 @@ def state(self, mode="standard"): def log(self): """Return the path to the main log. """ - if self.job is not None: - return self.xp.submitit / f"{self.job.job_id}_0_log.out" - return None + if self.job is None: + return None + path = self.xp.submitit / f"{self.job.job_id}_0_log.out" + for job in self._dependent_jobs: + new_path = self.xp.submitit / f"{job.job_id}_0_log.out" + if new_path.exists(): + path = new_path + return path def __repr__(self): out = f"Sheep({self.xp.sig}, state={self.state()}, " if self.job is not None: out += f"sid={self.job.job_id}, " - + if self._dependent_jobs is not None: + deps = ",".join(job.job_id for job in self._dependent_jobs) + out += f"deps={deps}, " out += f"argv={self.xp.argv})" return out @@ -246,7 +253,7 @@ def maybe_submit_lazy(self, sheep: Sheep, slurm_config: SlurmConfig, rules: Subm else: if rules.replace: logger.debug(f"Cancelling previous job {sheep.job.job_id} with status {state}") - self.cancel_lazy(sheep.job) + self.cancel_lazy(sheep=sheep) sheep.job = None if sheep.job is None: @@ -255,11 +262,18 @@ def maybe_submit_lazy(self, sheep: Sheep, slurm_config: SlurmConfig, rules: Subm assert slurm_config == self._to_submit[-1].slurm_config self._to_submit[-1].sheeps.append(sheep) - def cancel_lazy(self, job: submitit.SlurmJob): + def cancel_lazy(self, job: tp.Optional[submitit.SlurmJob] = None, + dependent_jobs: tp.Sequential[submitit.SlurmJob] = [], + sheep: Sheep = None): """ Cancel a job. The job is actually cancelled only when `commit()` is called. """ - self._to_cancel.append(job) + if job is None: + assert sheep is not None + self._to_cancel += [sheep.job] + list(sheep._dependent_jobs) + else: + assert sheep is None + self._to_cancel += [job] + list(dependent_jobs) def commit(self): """ @@ -369,8 +383,10 @@ def _submit(self, job_array: _JobArray): assert all(other.xp.dora.git_save == use_git_save for other in sheeps), \ "All jobs inside an array must have the same value for git_save.""" + requeue = True if slurm_config.dependents: assert not slurm_config.is_array, "Cannot use dependent jobs and job arrays" + requeue = False if is_array: name_sig = _get_sig(sorted([sheep.xp.sig for sheep in sheeps])) else: @@ -407,14 +423,29 @@ def _submit(self, job_array: _JobArray): if use_git_save: assert self._existing_git_clone is not None git_save.assign_clone(sheep.xp, self._existing_git_clone) - jobs.append(executor.submit(_SubmitItTarget(), self.main, sheep.xp.argv)) + jobs.append(executor.submit( + _SubmitItTarget(), self.main, sheep.xp.argv, requeue)) + if slurm_config.dependents: + assert len(job_array.sheeps) == 1 + for dep_index in range(slurm_config.dependents): + requeue = dep_index == slurm_config.dependents - 1 + last_job_id = jobs[-1].job_id + executor.update_parameters(dependency=f"afternotok:{last_job_id}") + jobs.append(executor.submit( + _SubmitItTarget(), self.main, sheep.xp.argv, requeue)) + dependent_jobs = [] + if slurm_config.dependents: + dependent_jobs = jobs[1:] + jobs = jobs[:1] + # Now we can access jobs for sheep, job in zip(sheeps, jobs): # See commment in `Sheep.state` function above for storing all jobs in the array. - pickle.dump((job, jobs), open(sheep._job_file, "wb")) + pickle.dump((job, jobs, dependent_jobs), open(sheep._job_file, "wb")) logger.debug("Created job with id %s", job.job_id) sheep.job = job # type: ignore sheep._other_jobs = jobs # type: ignore + sheep._dependent_jobs = dependent_jobs # type: ignore link = self._by_id / job.job_id link = link link.symlink_to(sheep.xp.folder.resolve()) diff --git a/dora/tests/test_shep.py b/dora/tests/test_shep.py index be525f2..86d8ea8 100644 --- a/dora/tests/test_shep.py +++ b/dora/tests/test_shep.py @@ -73,3 +73,19 @@ def test_shep(tmpdir): shepherd.commit() assert sheep.xp.code_folder.name == 'code' assert sheep.xp.code_folder.exists() + + +def test_dependent(tmpdir): + with mock_shep(): + main = get_main(tmpdir) + shepherd = Shepherd(main) + slurm = main.get_slurm_config() + + sheep = shepherd.get_sheep_from_argv([]) + slurm.dependents = 2 + shepherd._submit(_JobArray(slurm, [sheep])) + assert sheep.job is not None + assert sheep.job.job_id == "0" + assert len(sheep._dependent_jobs) == 2 + assert sheep._dependent_jobs[1].job_id == "1" + assert sheep._dependent_jobs[2].job_id == "1" From 0c782a720fdee63336adfa7928b736c3b80fa827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20D=C3=A9fossez?= Date: Fri, 7 Jul 2023 11:57:19 +0200 Subject: [PATCH 3/7] linter --- CHANGELOG.md | 4 ++++ dora/__init__.py | 2 +- dora/shep.py | 35 ++++++++++++++++++++++++++++------- dora/tests/test_shep.py | 4 ++-- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 20c0e0f..e3360b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [0.1.13] - TBD + +Adding dependent jobs. + ## [0.1.12] - 2023-05-23 Fixed bug with PL (Thanks @kingjr). diff --git a/dora/__init__.py b/dora/__init__.py index 4d232fb..58fef5e 100644 --- a/dora/__init__.py +++ b/dora/__init__.py @@ -60,7 +60,7 @@ __pdoc__ = {} __pdoc__['tests'] = False -__version__ = "0.1.12" +__version__ = "0.1.13a1" # flake8: noqa from .explore import Explorer, Launcher diff --git a/dora/shep.py b/dora/shep.py index 3ce8382..e12f66d 100644 --- a/dora/shep.py +++ b/dora/shep.py @@ -55,7 +55,7 @@ def checkpoint(self, *args, **kwargs): callback() if not self.requeue: - return + sys.exit(1) # let's exit early! if get_distrib_spec().rank == 0: # cleanup rendezvous file on requeue, otherwise things will fail. @@ -73,7 +73,7 @@ def __init__(self, xp: XP): self.xp = xp self.job: tp.Optional[submitit.SlurmJob] = None # Other jobs contain the list of other jobs in the array - self._other_jobs: tp.Optional[tp.List[submitit.SlurmJob]] = None + self._other_jobs: tp.List[submitit.SlurmJob] = [] self._dependent_jobs: tp.List[submitit.SlurmJob] = [] if self._job_file.exists(): content = try_load(self._job_file) @@ -116,8 +116,24 @@ def _is_done(self, job, mode="standard"): return True return job.watcher.is_done(job.job_id, mode) + def is_done(self, mode="standard"): + """Return True if the job is no longer running on the cluster. + """ + if self.job is None: + return True + if self._dependent_jobs: + assert len(self._other_jobs) == 0 + chain = self.job + [self._dependent_jobs] + for job in chain: + if not job.watcher.is_done(job.job_id, mode): + return False + return True + else: + return self.job.watcher.is_done(self.job.job_id, mode) + def state(self, mode="standard"): if self._dependent_jobs: + assert len(self._other_jobs) == 0 chain = self.job + [self._dependent_jobs] for job in chain: state = Sheep._get_state(job, [], mode) @@ -263,14 +279,17 @@ def maybe_submit_lazy(self, sheep: Sheep, slurm_config: SlurmConfig, rules: Subm self._to_submit[-1].sheeps.append(sheep) def cancel_lazy(self, job: tp.Optional[submitit.SlurmJob] = None, - dependent_jobs: tp.Sequential[submitit.SlurmJob] = [], - sheep: Sheep = None): + dependent_jobs: tp.Sequence[submitit.SlurmJob] = [], + sheep: tp.Optional[Sheep] = None): """ Cancel a job. The job is actually cancelled only when `commit()` is called. + You can either provide manually both a job and its dependents, or a sheep that + will be automatically processed. """ if job is None: assert sheep is not None - self._to_cancel += [sheep.job] + list(sheep._dependent_jobs) + if sheep.job is not None: + self._to_cancel += [sheep.job] + list(sheep._dependent_jobs) else: assert sheep is None self._to_cancel += [job] + list(dependent_jobs) @@ -385,7 +404,7 @@ def _submit(self, job_array: _JobArray): requeue = True if slurm_config.dependents: - assert not slurm_config.is_array, "Cannot use dependent jobs and job arrays" + assert not is_array, "Cannot use dependent jobs and job arrays" requeue = False if is_array: name_sig = _get_sig(sorted([sheep.xp.sig for sheep in sheeps])) @@ -434,9 +453,11 @@ def _submit(self, job_array: _JobArray): jobs.append(executor.submit( _SubmitItTarget(), self.main, sheep.xp.argv, requeue)) dependent_jobs = [] + other_jobs = jobs if slurm_config.dependents: dependent_jobs = jobs[1:] jobs = jobs[:1] + other_jobs = [] # Now we can access jobs for sheep, job in zip(sheeps, jobs): @@ -444,7 +465,7 @@ def _submit(self, job_array: _JobArray): pickle.dump((job, jobs, dependent_jobs), open(sheep._job_file, "wb")) logger.debug("Created job with id %s", job.job_id) sheep.job = job # type: ignore - sheep._other_jobs = jobs # type: ignore + sheep._other_jobs = other_jobs # type: ignore sheep._dependent_jobs = dependent_jobs # type: ignore link = self._by_id / job.job_id link = link diff --git a/dora/tests/test_shep.py b/dora/tests/test_shep.py index 86d8ea8..f3f0369 100644 --- a/dora/tests/test_shep.py +++ b/dora/tests/test_shep.py @@ -87,5 +87,5 @@ def test_dependent(tmpdir): assert sheep.job is not None assert sheep.job.job_id == "0" assert len(sheep._dependent_jobs) == 2 - assert sheep._dependent_jobs[1].job_id == "1" - assert sheep._dependent_jobs[2].job_id == "1" + assert sheep._dependent_jobs[0].job_id == "1" + assert sheep._dependent_jobs[1].job_id == "2" From 7610b6c739cc07e7752963bc35e1e6e8bd5ca57b Mon Sep 17 00:00:00 2001 From: Alexandre Defossez Date: Fri, 7 Jul 2023 03:49:22 -0700 Subject: [PATCH 4/7] fixes --- dora/grid.py | 2 +- dora/info.py | 3 ++- dora/shep.py | 47 ++++++++++++++++++++++++++++------------------- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/dora/grid.py b/dora/grid.py index 212e226..4c4febb 100644 --- a/dora/grid.py +++ b/dora/grid.py @@ -402,7 +402,7 @@ def monitor(args: tp.Any, main: DecoratedMain, explorer: Explorer, herd: tp.List meta = { 'name': name, 'index': index, - 'sid': sheep.job.job_id if sheep.job else '', + 'sid': sheep.current_job_id or '', # i know 0 is a valid sid, but who cares. 'sig': sheep.xp.sig, 'state': state, } diff --git a/dora/info.py b/dora/info.py index 1a79c85..18833f3 100644 --- a/dora/info.py +++ b/dora/info.py @@ -47,7 +47,8 @@ def info_action(args, main: DecoratedMain): elif sheep.is_done(): log("Job is not running") else: - sheep.job.cancel() + shepherd.cancel_lazy(sheep=sheep) + shepherd.commit() if args.log: if sheep.log is None: fatal("No log, sheep hasn't been scheduled yet.") diff --git a/dora/shep.py b/dora/shep.py index 0330fb0..e0ba1e7 100644 --- a/dora/shep.py +++ b/dora/shep.py @@ -109,7 +109,7 @@ def _get_state(job, other_jobs=[], mode="standard"): return state @staticmethod - def _is_done(self, job, mode="standard"): + def _is_done(job, mode="standard"): """Return True if the job is no longer running on the cluster. """ if job is None: @@ -122,8 +122,8 @@ def is_done(self, mode="standard"): if self.job is None: return True if self._dependent_jobs: - assert len(self._other_jobs) == 0 - chain = self.job + [self._dependent_jobs] + assert len(self._other_jobs) <= 1 + chain = [self.job] + self._dependent_jobs for job in chain: if not job.watcher.is_done(job.job_id, mode): return False @@ -133,8 +133,9 @@ def is_done(self, mode="standard"): def state(self, mode="standard"): if self._dependent_jobs: - assert len(self._other_jobs) == 0 - chain = self.job + [self._dependent_jobs] + assert self.job is not None + assert len(self._other_jobs) <= 1 + chain = [self.job] + self._dependent_jobs for job in chain: state = Sheep._get_state(job, [], mode) if state == 'COMPLETED' or not Sheep._is_done(job, mode): @@ -143,26 +144,34 @@ def state(self, mode="standard"): else: return self._get_state(self.job, self._other_jobs, mode) + def _log(self, job_id: str) -> Path: + return self.xp.submitit / f"{job_id}_0_log.out" + + @property + def current_job_id(self) -> tp.Optional[str]: + """Return the current job id, especially useful when using dependent jobs. + """ + if self.job is None: + return None + job_id = self.job.job_id + # We use the logs to be low tech and not require SLURM. + for job in self._dependent_jobs: + if self._log(job.job_id).exists(): + job_id = job.job_id + return job_id + @property def log(self): """Return the path to the main log. """ if self.job is None: return None - path = self.xp.submitit / f"{self.job.job_id}_0_log.out" - for job in self._dependent_jobs: - new_path = self.xp.submitit / f"{job.job_id}_0_log.out" - if new_path.exists(): - path = new_path - return path + return self._log(self.current_job_id) def __repr__(self): out = f"Sheep({self.xp.sig}, state={self.state()}, " if self.job is not None: - out += f"sid={self.job.job_id}, " - if self._dependent_jobs is not None: - deps = ",".join(job.job_id for job in self._dependent_jobs) - out += f"deps={deps}, " + out += f"sid={self.current_job_id}, " out += f"argv={self.xp.argv})" return out @@ -357,6 +366,7 @@ def _get_submitit_executor(self, name: str, folder: Path, del kwargs['mem_per_gpu'] del kwargs['cpus_per_gpu'] del kwargs['one_task_per_node'] + del kwargs['dependents'] logger.debug("Slurm parameters %r", kwargs) executor.update_parameters( @@ -449,15 +459,14 @@ def _submit(self, job_array: _JobArray): for dep_index in range(slurm_config.dependents): requeue = dep_index == slurm_config.dependents - 1 last_job_id = jobs[-1].job_id - executor.update_parameters(dependency=f"afternotok:{last_job_id}") + executor.update_parameters( + additional_parameters={'dependency': f"afternotok:{last_job_id}"}) jobs.append(executor.submit( _SubmitItTarget(), self.main, sheep.xp.argv, requeue)) dependent_jobs = [] - other_jobs = jobs if slurm_config.dependents: dependent_jobs = jobs[1:] jobs = jobs[:1] - other_jobs = [] # Now we can access jobs for sheep, job in zip(sheeps, jobs): @@ -465,7 +474,7 @@ def _submit(self, job_array: _JobArray): pickle.dump((job, jobs, dependent_jobs), open(sheep._job_file, "wb")) logger.debug("Created job with id %s", job.job_id) sheep.job = job # type: ignore - sheep._other_jobs = other_jobs # type: ignore + sheep._other_jobs = jobs # type: ignore sheep._dependent_jobs = dependent_jobs # type: ignore link = self._by_id / job.job_id link = link From 6cab9921dbf343f9e66343f6bbb8e4b1456386e8 Mon Sep 17 00:00:00 2001 From: Alexandre Defossez Date: Fri, 7 Jul 2023 03:49:55 -0700 Subject: [PATCH 5/7] doc --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e3360b1..5868173 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [0.1.13] - TBD -Adding dependent jobs. +Adding dependent jobs. E.g., use `launcher.slurm_(dependents=5`). Incompatible with +job arrays. ## [0.1.12] - 2023-05-23 From 84138fd5f7ca02fc89cbf2dde57866e841bd23e9 Mon Sep 17 00:00:00 2001 From: Alexandre Defossez Date: Fri, 7 Jul 2023 05:16:37 -0700 Subject: [PATCH 6/7] error --- dora/lightning.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dora/lightning.py b/dora/lightning.py index c0b3f78..a832500 100644 --- a/dora/lightning.py +++ b/dora/lightning.py @@ -16,7 +16,10 @@ from pytorch_lightning import LightningModule from pytorch_lightning.callbacks import Callback -from pytorch_lightning.callbacks.progress import ProgressBarBase +try: + from pytorch_lightning.callbacks.progress import ProgressBarBase +except ImportError: + raise ImportError("Only pytorch_lightning <= 1.8 is supported.") from pytorch_lightning.plugins.environments import ClusterEnvironment from pytorch_lightning.trainer import Trainer from pytorch_lightning.utilities.argparse import from_argparse_args From 28f847e61dda9c962b97eb615311685c805ec8d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20D=C3=A9fossez?= Date: Fri, 7 Jul 2023 14:18:33 +0200 Subject: [PATCH 7/7] fix ci --- .github/workflows/linter.yml | 3 ++- .github/workflows/tests.yml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 5c9da33..076ab26 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -24,7 +24,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -U -r requirements.txt + pip install "pytorch_lightning<1.9" + pip install -r requirements.txt pip install -e '.[dev]' - name: Run tests diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 18e3249..53b5ea9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,7 +24,8 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -U -r requirements.txt + pip install "pytorch_lightning<1.9" + pip install -r requirements.txt pip install -e '.[dev]' - name: Run tests