Skip to content

Commit

Permalink
Sort keys and fold resubmit steps by virtue of dflow (#269)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Enhanced workflow management with a new approach for handling workflow
steps and resubmission keys.
- Streamlined error handling to improve workflow integrity during
resubmission.

- **Bug Fixes**
	- Improved clarity and efficiency in the workflow management system.

- **Tests**
- Updated tests to align with new workflow logic, replacing old key
folding tests with resubmission key validation.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: zjgemi <liuxin_zijian@163.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
zjgemi and pre-commit-ci[bot] authored Oct 29, 2024
1 parent 35e0b97 commit 067c60f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 239 deletions.
173 changes: 40 additions & 133 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,136 +808,43 @@ def print_list_steps(
return "\n".join(ret)


def successful_step_keys(wf):
all_step_keys = []
steps = wf.query_step()
# For reused steps whose startedAt are identical, sort them by key
steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
for step in steps:
if step.key is not None and step.phase == "Succeeded":
all_step_keys.append(step.key)
return all_step_keys


def get_superop(key):
if "prep-train" in key:
return key.replace("prep-train", "prep-run-train")
elif "run-train-" in key:
return re.sub("run-train-[0-9]*", "prep-run-train", key)
elif "prep-lmp" in key:
return key.replace("prep-lmp", "prep-run-explore")
elif "run-lmp-" in key:
return re.sub("run-lmp-[0-9]*", "prep-run-explore", key)
elif "prep-fp" in key:
return key.replace("prep-fp", "prep-run-fp")
elif "run-fp-" in key:
return re.sub("run-fp-[0-9]*", "prep-run-fp", key)
elif "prep-caly-input" in key:
return key.replace("prep-caly-input", "prep-run-explore")
elif "collect-run-calypso-" in key:
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-dp-optim-" in key:
return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-dp-optim-" in key:
return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-caly-model-devi" in key:
return key.replace("prep-caly-model-devi", "prep-run-explore")
elif "run-caly-model-devi" in key:
return re.sub("run-caly-model-devi-[0-9]*", "prep-run-explore", key)
elif "caly-evo-step" in key:
return re.sub("caly-evo-step-[0-9]*", "prep-run-explore", key)
elif "diffcsp-gen-" in key:
return re.sub("diffcsp-gen-[0-9]*", "prep-run-explore", key)
elif "prep-relax" in key:
return re.sub("prep-relax", "prep-run-explore", key)
elif "run-relax-" in key:
return re.sub("run-relax-[0-9]*", "prep-run-explore", key)
return None


def fold_keys(all_step_keys):
folded_keys = {}
for key in all_step_keys:
is_superop = False
for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]:
if superop in key:
if key not in folded_keys:
folded_keys[key] = []
is_superop = True
break
if is_superop:
continue
superop = get_superop(key)
# if its super OP is succeeded, fold it into its super OP
if superop is not None and superop in all_step_keys:
if superop not in folded_keys:
folded_keys[superop] = []
folded_keys[superop].append(key)
else:
folded_keys[key] = [key]
for k, v in folded_keys.items():
if v == []:
folded_keys[k] = [k]
return folded_keys


def get_resubmit_keys(
wf,
):
all_step_keys = successful_step_keys(wf)
step_keys = [
"prep-run-train",
"prep-train",
"run-train",
"prep-caly-input",
"prep-caly-model-devi",
"run-caly-model-devi",
"prep-run-explore",
"prep-lmp",
"run-lmp",
"diffcsp-gen",
"prep-relax",
"run-relax",
wf_info = wf.query()
all_steps = [
step
for step in wf_info.get_step(sort_by_generation=True)
if step.key is not None
]
super_keys = ["prep-run-train", "prep-run-explore", "prep-run-fp"]
other_keys = [
"select-confs",
"prep-run-fp",
"prep-fp",
"run-fp",
"collect-data",
"scheduler",
"id",
]
if (
len(
matched_step_key(
all_step_keys,
[
"collect-run-calypso",
"prep-dp-optim",
"run-dp-optim",
],
)
)
> 0
):
# calypso default mode
step_keys += [
"collect-run-calypso",
"prep-dp-optim",
"run-dp-optim",
]
else:
# calypso merge mode
step_keys.append("caly-evo-step")

all_step_keys = matched_step_key(
all_step_keys,
step_keys,
)
all_step_keys = sort_slice_ops(
all_step_keys,
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
)
folded_keys = fold_keys(all_step_keys)
folded_keys = {}
for step in all_steps:
if len(matched_step_key([step.key], super_keys)) > 0:
sub_steps = wf_info.get_step(parent_id=step.id, sort_by_generation=True)
sub_keys = [
step.key
for step in sub_steps
if step.key is not None and step.phase == "Succeeded"
]
sub_keys = sort_slice_ops(
sub_keys,
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
)
if step.phase == "Succeeded":
folded_keys[step.key] = sub_keys
else:
for key in sub_keys:
folded_keys[key] = [key]
elif len(matched_step_key([step.key], other_keys)) > 0:
folded_keys[step.key] = [step.key]
return folded_keys


Expand All @@ -955,7 +862,12 @@ def resubmit_concurrent_learning(

old_wf = Workflow(id=wfid)
folded_keys = get_resubmit_keys(old_wf)
all_step_keys = sum(folded_keys.values(), [])
all_step_keys = []
super_keys = {}
for super_key, keys in folded_keys.items():
all_step_keys += keys
for key in keys:
super_keys[key] = super_key

if list_steps:
prt_str = print_keys_in_nice_format(
Expand All @@ -971,21 +883,16 @@ def resubmit_concurrent_learning(
if fold:
reused_folded_keys = {}
for key in reused_keys:
superop = get_superop(key)
if superop is not None:
if superop not in reused_folded_keys:
reused_folded_keys[superop] = []
reused_folded_keys[superop].append(key)
else:
reused_folded_keys[key] = [key]
super_key = super_keys[key]
if super_key not in reused_folded_keys:
reused_folded_keys[super_key] = []
reused_folded_keys[super_key].append(key)
for k, v in reused_folded_keys.items():
# reuse the super OP iif all steps within it are reused
if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]):
if set(v) == set(folded_keys[k]):
reused_folded_keys[k] = [k]
reused_keys = sum(reused_folded_keys.values(), [])
reuse_step = old_wf.query_step(key=reused_keys)
# For reused steps whose startedAt are identical, sort them by key
reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
reuse_step = old_wf.query_step(key=reused_keys, sort_by_generation=True)

wf = submit_concurrent_learning(
wf_config,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ classifiers = [
dependencies = [
'numpy',
'dpdata>=0.2.20',
'pydflow>=1.8.95',
'pydflow>=1.8.97',
'dargs>=0.3.1',
'scipy',
'lbg',
Expand Down
Loading

0 comments on commit 067c60f

Please sign in to comment.