Skip to content

Commit

Permalink
sort keys in showkey by virtue of dflow; fold resubmit steps by virtu…
Browse files Browse the repository at this point in the history
…e of dflow

Signed-off-by: zjgemi <liuxin_zijian@163.com>
  • Loading branch information
zjgemi committed Oct 25, 2024
1 parent 35e0b97 commit 19ee8d5
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 237 deletions.
165 changes: 34 additions & 131 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,136 +808,39 @@ def print_list_steps(
return "\n".join(ret)


def successful_step_keys(wf):
all_step_keys = []
steps = wf.query_step()
# For reused steps whose startedAt are identical, sort them by key
steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
for step in steps:
if step.key is not None and step.phase == "Succeeded":
all_step_keys.append(step.key)
return all_step_keys


def get_superop(key):
if "prep-train" in key:
return key.replace("prep-train", "prep-run-train")
elif "run-train-" in key:
return re.sub("run-train-[0-9]*", "prep-run-train", key)
elif "prep-lmp" in key:
return key.replace("prep-lmp", "prep-run-explore")
elif "run-lmp-" in key:
return re.sub("run-lmp-[0-9]*", "prep-run-explore", key)
elif "prep-fp" in key:
return key.replace("prep-fp", "prep-run-fp")
elif "run-fp-" in key:
return re.sub("run-fp-[0-9]*", "prep-run-fp", key)
elif "prep-caly-input" in key:
return key.replace("prep-caly-input", "prep-run-explore")
elif "collect-run-calypso-" in key:
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-dp-optim-" in key:
return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-dp-optim-" in key:
return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-caly-model-devi" in key:
return key.replace("prep-caly-model-devi", "prep-run-explore")
elif "run-caly-model-devi" in key:
return re.sub("run-caly-model-devi-[0-9]*", "prep-run-explore", key)
elif "caly-evo-step" in key:
return re.sub("caly-evo-step-[0-9]*", "prep-run-explore", key)
elif "diffcsp-gen-" in key:
return re.sub("diffcsp-gen-[0-9]*", "prep-run-explore", key)
elif "prep-relax" in key:
return re.sub("prep-relax", "prep-run-explore", key)
elif "run-relax-" in key:
return re.sub("run-relax-[0-9]*", "prep-run-explore", key)
return None


def fold_keys(all_step_keys):
folded_keys = {}
for key in all_step_keys:
is_superop = False
for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]:
if superop in key:
if key not in folded_keys:
folded_keys[key] = []
is_superop = True
break
if is_superop:
continue
superop = get_superop(key)
# if its super OP is succeeded, fold it into its super OP
if superop is not None and superop in all_step_keys:
if superop not in folded_keys:
folded_keys[superop] = []
folded_keys[superop].append(key)
else:
folded_keys[key] = [key]
for k, v in folded_keys.items():
if v == []:
folded_keys[k] = [k]
return folded_keys


def get_resubmit_keys(
wf,
):
all_step_keys = successful_step_keys(wf)
step_keys = [
wf_info = wf.query()
all_steps = [step for step in wf_info.get_step(sort_by_generation=True) if step.key is not None]
super_keys = [
"prep-run-train",
"prep-train",
"run-train",
"prep-caly-input",
"prep-caly-model-devi",
"run-caly-model-devi",
"prep-run-explore",
"prep-lmp",
"run-lmp",
"diffcsp-gen",
"prep-relax",
"run-relax",
"prep-run-fp"
]
other_keys = [
"select-confs",
"prep-run-fp",
"prep-fp",
"run-fp",
"collect-data",
"scheduler",
"id",
]
if (
len(
matched_step_key(
all_step_keys,
[
"collect-run-calypso",
"prep-dp-optim",
"run-dp-optim",
],
)
)
> 0
):
# calypso default mode
step_keys += [
"collect-run-calypso",
"prep-dp-optim",
"run-dp-optim",
]
else:
# calypso merge mode
step_keys.append("caly-evo-step")

all_step_keys = matched_step_key(
all_step_keys,
step_keys,
)
all_step_keys = sort_slice_ops(
all_step_keys,
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
)
folded_keys = fold_keys(all_step_keys)
folded_keys = {}
for step in all_steps:
if len(matched_step_key([step.key], super_keys)) > 0:
sub_steps = wf_info.get_step(parent_id=step.id, sort_by_generation=True)
sub_keys = [step.key for step in sub_steps if step.key is not None and step.phase == "Succeeded"]
sub_keys = sort_slice_ops(
sub_keys,
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
)
if step.phase == "Succeeded":
folded_keys[step.key] = sub_keys
else:
for key in sub_keys:
folded_keys[key] = [key]
elif len(matched_step_key([step.key], other_keys)) > 0:
folded_keys[step.key] = [step.key]
return folded_keys


Expand All @@ -955,7 +858,12 @@ def resubmit_concurrent_learning(

old_wf = Workflow(id=wfid)
folded_keys = get_resubmit_keys(old_wf)
all_step_keys = sum(folded_keys.values(), [])
all_step_keys = []
super_keys = {}
for super_key, keys in folded_keys.items():
all_step_keys += keys
for key in keys:
super_keys[key] = super_key

if list_steps:
prt_str = print_keys_in_nice_format(
Expand All @@ -971,21 +879,16 @@ def resubmit_concurrent_learning(
if fold:
reused_folded_keys = {}
for key in reused_keys:
superop = get_superop(key)
if superop is not None:
if superop not in reused_folded_keys:
reused_folded_keys[superop] = []
reused_folded_keys[superop].append(key)
else:
reused_folded_keys[key] = [key]
super_key = super_keys[key]
if super_key not in reused_folded_keys:
reused_folded_keys[super_key] = []
reused_folded_keys[super_key].append(key)
for k, v in reused_folded_keys.items():
# reuse the super OP iif all steps within it are reused
if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]):
if set(v) == set(folded_keys[k]):
reused_folded_keys[k] = [k]
reused_keys = sum(reused_folded_keys.values(), [])
reuse_step = old_wf.query_step(key=reused_keys)
# For reused steps whose startedAt are identical, sort them by key
reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
reuse_step = old_wf.query_step(key=reused_keys, sort_by_generation=True)

wf = submit_concurrent_learning(
wf_config,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ classifiers = [
dependencies = [
'numpy',
'dpdata>=0.2.20',
'pydflow>=1.8.95',
'pydflow>=1.8.97',
'dargs>=0.3.1',
'scipy',
'lbg',
Expand Down
Loading

0 comments on commit 19ee8d5

Please sign in to comment.