Skip to content

Commit

Permalink
Merge pull request #48 from facebookresearch/dependent
Browse files Browse the repository at this point in the history
Dependent jobs
  • Loading branch information
adefossez authored Jul 7, 2023
2 parents 5b615d3 + 28f847e commit 07e9746
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 33 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -U -r requirements.txt
pip install "pytorch_lightning<1.9"
pip install -r requirements.txt
pip install -e '.[dev]'
- name: Run tests
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -U -r requirements.txt
pip install "pytorch_lightning<1.9"
pip install -r requirements.txt
pip install -e '.[dev]'
- name: Run tests
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.1.13] - TBD

Adding dependent jobs. E.g., use `launcher.slurm_(dependents=5`). Incompatible with
job arrays.

## [0.1.12] - 2023-05-23

Fixed bug with PL (Thanks @kingjr).
Expand Down
2 changes: 1 addition & 1 deletion dora/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
__pdoc__ = {}
__pdoc__['tests'] = False

__version__ = "0.1.12"
__version__ = "0.1.13a1"

# flake8: noqa
from .explore import Explorer, Launcher
Expand Down
7 changes: 5 additions & 2 deletions dora/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ class SlurmConfig:
per node, otherwise, will schedule one task per gpu (default is False).
array_parallelism (int): when using job arrays, how many tasks can run
in parallel.
qos: (str or None): qos param for slurm.
account: (str or None): account param for slurm.
qos (str or None): qos param for slurm.
account (str or None): account param for slurm.
dependents (int): if > 0, start a number of dependent jobs. Requeuing
will be deactivated and rely on dependent jobs instead.
..warning:: this assumes one task per GPU.
Set `one_task_per_node` if you do not want that.
Expand All @@ -92,6 +94,7 @@ class SlurmConfig:
exclude: tp.Optional[str] = None
qos: tp.Optional[str] = None
account: tp.Optional[str] = None
dependents: int = 0


@dataclass
Expand Down
12 changes: 7 additions & 5 deletions dora/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str,
log("Canceling all current jobs...")
for sheep in sheeps:
if sheep.job is not None:
shepherd.cancel_lazy(sheep.job)
shepherd.cancel_lazy(sheep=sheep)
shepherd.commit()
log("Deleting XP folders...")
for sheep in sheeps:
Expand All @@ -215,8 +215,10 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str,
jobs = try_load(job_file)
if jobs is not None:
job = jobs[0]
if len(jobs) == 3:
dependent_jobs = jobs[2]
log(f"Canceling job {job.job_id} from unloadable sheep {child.name}.")
shepherd.cancel_lazy(job)
shepherd.cancel_lazy(job, dependent_jobs)
else:
assert old_sheep is not None
old_sheeps.append(old_sheep)
Expand All @@ -241,7 +243,7 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str,
for old_sheep in old_sheeps:
if not old_sheep.is_done():
assert old_sheep.job is not None
shepherd.cancel_lazy(old_sheep.job)
shepherd.cancel_lazy(sheep=old_sheep)
name = main.get_name(old_sheep.xp)
log(f"Canceling job {old_sheep.job.job_id} for no longer required "
f"sheep {old_sheep.xp.sig}/{name}")
Expand All @@ -252,7 +254,7 @@ def run_grid(main: DecoratedMain, explorer: Explorer, grid_name: str,
assert sheep.job is not None
name = main.get_name(sheep.xp)
log(f"Canceling job {sheep.job.job_id} for sheep {sheep.xp.sig}/{name}")
shepherd.cancel_lazy(sheep.job)
shepherd.cancel_lazy(sheep=sheep)

if not args.dry_run:
for sheep in sheeps:
Expand Down Expand Up @@ -400,7 +402,7 @@ def monitor(args: tp.Any, main: DecoratedMain, explorer: Explorer, herd: tp.List
meta = {
'name': name,
'index': index,
'sid': sheep.job.job_id if sheep.job else '',
'sid': sheep.current_job_id or '', # i know 0 is a valid sid, but who cares.
'sig': sheep.xp.sig,
'state': state,
}
Expand Down
3 changes: 2 additions & 1 deletion dora/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ def info_action(args, main: DecoratedMain):
elif sheep.is_done():
log("Job is not running")
else:
sheep.job.cancel()
shepherd.cancel_lazy(sheep=sheep)
shepherd.commit()
if args.log:
if sheep.log is None:
fatal("No log, sheep hasn't been scheduled yet.")
Expand Down
2 changes: 1 addition & 1 deletion dora/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def launch_action(args, main: DecoratedMain):
if args.clear:
log("Canceling current job...")
if sheep.job is not None:
shepherd.cancel_lazy(sheep.job)
shepherd.cancel_lazy(sheep=sheep)
shepherd.commit()
log("Deleting XP folder...")
if sheep.xp.folder.exists():
Expand Down
5 changes: 4 additions & 1 deletion dora/lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

from pytorch_lightning import LightningModule
from pytorch_lightning.callbacks import Callback
from pytorch_lightning.callbacks.progress import ProgressBarBase
try:
from pytorch_lightning.callbacks.progress import ProgressBarBase
except ImportError:
raise ImportError("Only pytorch_lightning <= 1.8 is supported.")
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.trainer import Trainer
from pytorch_lightning.utilities.argparse import from_argparse_args
Expand Down
2 changes: 1 addition & 1 deletion dora/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def check_job_and_clear(argv: tp.List[str], main: DecoratedMain, clear: bool = F
log(red(f"Found existing slurm job {job.job_id} with status {job.state}."))
if clear:
log("Cancelling the existing job.")
shepherd.cancel_lazy(sheep.job)
shepherd.cancel_lazy(sheep=sheep)
shepherd.commit()
time.sleep(3)
else:
Expand Down
135 changes: 116 additions & 19 deletions dora/shep.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@
logger = logging.getLogger(__name__)


PreemptionCallback = tp.Callable[[], None]
_preemption_callbacks: tp.List[PreemptionCallback] = []


def register_preemption_callaback(callback: PreemptionCallback):
_preemption_callbacks.append(callback)


class _SubmitItTarget:
def __call__(self, main: DecoratedMain, argv: tp.Sequence[str]):
def __call__(self, main: DecoratedMain, argv: tp.Sequence[str], requeue: bool = True):
self.xp = main.get_xp(argv)
self.requeue = requeue
spec = get_distrib_spec()
# We export the RANK as it can be used to customize logging early on
# in the called program (e.g. using Hydra).
Expand All @@ -42,6 +51,12 @@ def __call__(self, main: DecoratedMain, argv: tp.Sequence[str]):
main()

def checkpoint(self, *args, **kwargs):
for callback in _preemption_callbacks:
callback()

if not self.requeue:
sys.exit(1) # let's exit early!

if get_distrib_spec().rank == 0:
# cleanup rendezvous file on requeue, otherwise things will fail.
if self.xp.rendezvous_file.exists():
Expand All @@ -58,26 +73,33 @@ def __init__(self, xp: XP):
self.xp = xp
self.job: tp.Optional[submitit.SlurmJob] = None
# Other jobs contain the list of other jobs in the array
self._other_jobs: tp.Optional[tp.List[submitit.SlurmJob]] = None
self._other_jobs: tp.List[submitit.SlurmJob] = []
self._dependent_jobs: tp.List[submitit.SlurmJob] = []
if self._job_file.exists():
content = try_load(self._job_file)
if isinstance(content, tuple):
self.job, self._other_jobs = content
if len(content) == 2:
self.job, self._other_jobs = content
elif len(content) == 3:
self.job, self._other_jobs, self._dependent_jobs = content
else:
raise RuntimeError("Invalid content for job file.")
else:
self.job = content

@property
def _job_file(self) -> Path:
return self.xp.folder / self.xp.dora.shep.job_file

def state(self, mode="standard"):
@staticmethod
def _get_state(job, other_jobs=[], mode="standard"):
"""Return the current state of the `Sheep`.
"""
if self.job is None:
if job is None:
return None
state = self.job.watcher.get_state(self.job.job_id, mode)
if state == 'UNKNOWN' and self._other_jobs:
if any(job.state != 'UNKNOWN' for job in self._other_jobs):
state = job.watcher.get_state(job.job_id, mode)
if state == 'UNKNOWN' and other_jobs:
if any(job.state != 'UNKNOWN' for job in other_jobs):
# When cancelling single entries in a job array,
# sacct will just completely forget about it insted of marking
# it as cancelled. So we use a specific 'MISSING' status to handle that.
Expand All @@ -86,26 +108,70 @@ def state(self, mode="standard"):
return 'CANCELLED'
return state

@staticmethod
def _is_done(job, mode="standard"):
"""Return True if the job is no longer running on the cluster.
"""
if job is None:
return True
return job.watcher.is_done(job.job_id, mode)

def is_done(self, mode="standard"):
"""Return True if the job is no longer running on the cluster.
"""
if self.job is None:
return True
return self.job.watcher.is_done(self.job.job_id, mode)
if self._dependent_jobs:
assert len(self._other_jobs) <= 1
chain = [self.job] + self._dependent_jobs
for job in chain:
if not job.watcher.is_done(job.job_id, mode):
return False
return True
else:
return self.job.watcher.is_done(self.job.job_id, mode)

def state(self, mode="standard"):
if self._dependent_jobs:
assert self.job is not None
assert len(self._other_jobs) <= 1
chain = [self.job] + self._dependent_jobs
for job in chain:
state = Sheep._get_state(job, [], mode)
if state == 'COMPLETED' or not Sheep._is_done(job, mode):
return state
return state
else:
return self._get_state(self.job, self._other_jobs, mode)

def _log(self, job_id: str) -> Path:
return self.xp.submitit / f"{job_id}_0_log.out"

@property
def current_job_id(self) -> tp.Optional[str]:
"""Return the current job id, especially useful when using dependent jobs.
"""
if self.job is None:
return None
job_id = self.job.job_id
# We use the logs to be low tech and not require SLURM.
for job in self._dependent_jobs:
if self._log(job.job_id).exists():
job_id = job.job_id
return job_id

@property
def log(self):
"""Return the path to the main log.
"""
if self.job is not None:
return self.xp.submitit / f"{self.job.job_id}_0_log.out"
return None
if self.job is None:
return None
return self._log(self.current_job_id)

def __repr__(self):
out = f"Sheep({self.xp.sig}, state={self.state()}, "
if self.job is not None:
out += f"sid={self.job.job_id}, "

out += f"sid={self.current_job_id}, "
out += f"argv={self.xp.argv})"
return out

Expand Down Expand Up @@ -212,7 +278,7 @@ def maybe_submit_lazy(self, sheep: Sheep, slurm_config: SlurmConfig, rules: Subm
else:
if rules.replace:
logger.debug(f"Cancelling previous job {sheep.job.job_id} with status {state}")
self.cancel_lazy(sheep.job)
self.cancel_lazy(sheep=sheep)
sheep.job = None

if sheep.job is None:
Expand All @@ -221,11 +287,21 @@ def maybe_submit_lazy(self, sheep: Sheep, slurm_config: SlurmConfig, rules: Subm
assert slurm_config == self._to_submit[-1].slurm_config
self._to_submit[-1].sheeps.append(sheep)

def cancel_lazy(self, job: submitit.SlurmJob):
def cancel_lazy(self, job: tp.Optional[submitit.SlurmJob] = None,
dependent_jobs: tp.Sequence[submitit.SlurmJob] = [],
sheep: tp.Optional[Sheep] = None):
"""
Cancel a job. The job is actually cancelled only when `commit()` is called.
You can either provide manually both a job and its dependents, or a sheep that
will be automatically processed.
"""
self._to_cancel.append(job)
if job is None:
assert sheep is not None
if sheep.job is not None:
self._to_cancel += [sheep.job] + list(sheep._dependent_jobs)
else:
assert sheep is None
self._to_cancel += [job] + list(dependent_jobs)

def commit(self):
"""
Expand Down Expand Up @@ -290,6 +366,7 @@ def _get_submitit_executor(self, name: str, folder: Path,
del kwargs['mem_per_gpu']
del kwargs['cpus_per_gpu']
del kwargs['one_task_per_node']
del kwargs['dependents']
logger.debug("Slurm parameters %r", kwargs)

executor.update_parameters(
Expand Down Expand Up @@ -335,6 +412,10 @@ def _submit(self, job_array: _JobArray):
assert all(other.xp.dora.git_save == use_git_save for other in sheeps), \
"All jobs inside an array must have the same value for git_save."""

requeue = True
if slurm_config.dependents:
assert not is_array, "Cannot use dependent jobs and job arrays"
requeue = False
if is_array:
name_sig = _get_sig(sorted([sheep.xp.sig for sheep in sheeps]))
else:
Expand Down Expand Up @@ -371,14 +452,30 @@ def _submit(self, job_array: _JobArray):
if use_git_save:
assert self._existing_git_clone is not None
git_save.assign_clone(sheep.xp, self._existing_git_clone)
jobs.append(executor.submit(_SubmitItTarget(), self.main, sheep.xp.argv))
jobs.append(executor.submit(
_SubmitItTarget(), self.main, sheep.xp.argv, requeue))
if slurm_config.dependents:
assert len(job_array.sheeps) == 1
for dep_index in range(slurm_config.dependents):
requeue = dep_index == slurm_config.dependents - 1
last_job_id = jobs[-1].job_id
executor.update_parameters(
additional_parameters={'dependency': f"afternotok:{last_job_id}"})
jobs.append(executor.submit(
_SubmitItTarget(), self.main, sheep.xp.argv, requeue))
dependent_jobs = []
if slurm_config.dependents:
dependent_jobs = jobs[1:]
jobs = jobs[:1]

# Now we can access jobs
for sheep, job in zip(sheeps, jobs):
# See commment in `Sheep.state` function above for storing all jobs in the array.
pickle.dump((job, jobs), open(sheep._job_file, "wb"))
pickle.dump((job, jobs, dependent_jobs), open(sheep._job_file, "wb"))
logger.debug("Created job with id %s", job.job_id)
sheep.job = job # type: ignore
sheep._other_jobs = jobs # type: ignore
sheep._dependent_jobs = dependent_jobs # type: ignore
link = self._by_id / job.job_id
link = link
link.symlink_to(sheep.xp.folder.resolve())
Expand Down
Loading

0 comments on commit 07e9746

Please sign in to comment.