From cea027dc317043fb6aa49469bb81246f62e6ebc3 Mon Sep 17 00:00:00 2001 From: Geoffrey Yu Date: Sun, 7 Feb 2021 22:29:34 -0500 Subject: [PATCH] [Tasks] Add run_experiment_group() task type (#27) --- errors/errors.yml | 15 ++++++ src/conductor/errors/generated.py | 30 +++++++++++ src/conductor/parsing/task_loader.py | 26 +++++++--- src/conductor/task_types/stdlib/__init__.py | 5 ++ .../task_types/stdlib/run_experiment_group.py | 52 +++++++++++++++++++ .../experiments/invalid-group-duplicate/COND | 8 +++ .../experiments/invalid-group-type/COND | 8 +++ tests/fixture-projects/experiments/sweep/COND | 11 ++++ .../fixture-projects/experiments/sweep/run.sh | 3 ++ tests/run_experiment_group_test.py | 46 ++++++++++++++++ 10 files changed, 198 insertions(+), 6 deletions(-) create mode 100644 src/conductor/task_types/stdlib/__init__.py create mode 100644 src/conductor/task_types/stdlib/run_experiment_group.py create mode 100644 tests/fixture-projects/experiments/invalid-group-duplicate/COND create mode 100644 tests/fixture-projects/experiments/invalid-group-type/COND create mode 100644 tests/fixture-projects/experiments/sweep/COND create mode 100755 tests/fixture-projects/experiments/sweep/run.sh create mode 100644 tests/run_experiment_group_test.py diff --git a/errors/errors.yml b/errors/errors.yml index 818a46f..7e40a73 100644 --- a/errors/errors.yml +++ b/errors/errors.yml @@ -65,6 +65,21 @@ processing task '{identifier}'. All experiment option values must be either a string, integer, floating point number, or boolean. +1014: + name: ExperimentGroupDuplicateName + message: >- + Encountered a duplicate experiment instance name '{instance_name}' when + processing a run_experiment_group() task with name '{task_name}'. + Experiment instance names must be unique. + +1015: + name: ExperimentGroupInvalidExperimentInstance + message: >- + Encountered an experiment instance that was incorrectly formed when + processing a run_experiment_group() task with name '{task_name}'. + Experiments in an experiment group must be defined using an iterable of + ExperimentInstance named tuples. + # Task graph loading errors (error code 2xxx) 2001: diff --git a/src/conductor/errors/generated.py b/src/conductor/errors/generated.py index da86e82..2945047 100644 --- a/src/conductor/errors/generated.py +++ b/src/conductor/errors/generated.py @@ -200,6 +200,34 @@ def _message(self): ) +class ExperimentGroupDuplicateName(ConductorError): + error_code = 1014 + + def __init__(self, **kwargs): + super().__init__() + self.instance_name = kwargs["instance_name"] + self.task_name = kwargs["task_name"] + + def _message(self): + return "Encountered a duplicate experiment instance name '{instance_name}' when processing a run_experiment_group() task with name '{task_name}'. Experiment instance names must be unique.".format( + instance_name=self.instance_name, + task_name=self.task_name, + ) + + +class ExperimentGroupInvalidExperimentInstance(ConductorError): + error_code = 1015 + + def __init__(self, **kwargs): + super().__init__() + self.task_name = kwargs["task_name"] + + def _message(self): + return "Encountered an experiment instance that was incorrectly formed when processing a run_experiment_group() task with name '{task_name}'. Experiments in an experiment group must be defined using a list of ExperimentInstance named tuples.".format( + task_name=self.task_name, + ) + + class TaskNotFound(ConductorError): error_code = 2001 @@ -386,6 +414,8 @@ def _message(self): "CombineDuplicateDepName", "ExperimentOptionsNonStringKey", "ExperimentOptionsNonPrimitiveValue", + "ExperimentGroupDuplicateName", + "ExperimentGroupInvalidExperimentInstance", "TaskNotFound", "MissingProjectRoot", "CyclicDependency", diff --git a/src/conductor/parsing/task_loader.py b/src/conductor/parsing/task_loader.py index 892d319..a74aa53 100644 --- a/src/conductor/parsing/task_loader.py +++ b/src/conductor/parsing/task_loader.py @@ -6,17 +6,14 @@ ParsingUnknownNameError, TaskSyntaxError, ) +from conductor.task_types.stdlib import STDLIB_FILES class TaskLoader: def __init__(self): self._tasks = None self._current_cond_file_path = None - self._task_constructors = {} - for raw_task_type in raw_task_types.values(): - self._task_constructors[raw_task_type.name] = self._wrap_task_function( - raw_task_type.load_from_cond_file - ) + self._conductor_scope = self._compile_scope() def parse_cond_file(self, cond_file_path): """ @@ -29,7 +26,7 @@ def parse_cond_file(self, cond_file_path): with open(cond_file_path) as file: code = file.read() # pylint: disable=exec-used - exec(code, self._task_constructors, self._task_constructors) + exec(code, self._conductor_scope.copy()) return tasks except ConductorError as ex: ex.add_file_context(file_path=cond_file_path) @@ -53,6 +50,23 @@ def parse_cond_file(self, cond_file_path): self._tasks = None self._current_cond_file_path = None + def _compile_scope(self): + scope = {} + # Create the task constructors for Conductor's foundational task types. + for raw_task_type in raw_task_types.values(): + scope[raw_task_type.name] = self._wrap_task_function( + raw_task_type.load_from_cond_file + ) + # We need to explicitly `compile()` the Conductor standard library + # files here to ensure that any uses of Conductor's foundational task + # types bind to the task constructors defined above. + for lib_file_path in STDLIB_FILES: + with open(lib_file_path, "r") as lib_file: + code = compile(lib_file.read(), str(lib_file_path), "exec") + # pylint: disable=exec-used + exec(code, scope) + return scope + def _wrap_task_function(self, task_constructor): def shim(**kwargs): raw_task = task_constructor(**kwargs) diff --git a/src/conductor/task_types/stdlib/__init__.py b/src/conductor/task_types/stdlib/__init__.py new file mode 100644 index 0000000..5499b44 --- /dev/null +++ b/src/conductor/task_types/stdlib/__init__.py @@ -0,0 +1,5 @@ +import pathlib + +_MODULE_PATH = pathlib.Path(__file__).resolve().parent + +STDLIB_FILES = [_MODULE_PATH / "run_experiment_group.py"] diff --git a/src/conductor/task_types/stdlib/run_experiment_group.py b/src/conductor/task_types/stdlib/run_experiment_group.py new file mode 100644 index 0000000..daa6279 --- /dev/null +++ b/src/conductor/task_types/stdlib/run_experiment_group.py @@ -0,0 +1,52 @@ +from typing import Dict, Iterable, NamedTuple, Optional, Sequence +from conductor.utils.experiment_options import OptionValue +from conductor.errors import ( + ExperimentGroupDuplicateName, + ExperimentGroupInvalidExperimentInstance, +) + + +class ExperimentInstance(NamedTuple): + name: str + options: Dict[str, OptionValue] + + +def run_experiment_group( + name: str, + run: str, + experiments: Iterable[ExperimentInstance], + deps: Optional[Sequence[str]] = None, +) -> None: + task_deps = deps if deps is not None else [] + relative_experiment_identifiers = [] + + try: + seen_experiment_names = set() + for experiment in experiments: + if not isinstance(experiment, ExperimentInstance): + raise ExperimentGroupInvalidExperimentInstance(task_name=name) + if experiment.name in seen_experiment_names: + raise ExperimentGroupDuplicateName( + task_name=name, instance_name=experiment.name + ) + + seen_experiment_names.add(experiment.name) + # run_experiment(): Defined by Conductor at runtime + # pylint: disable=undefined-variable + run_experiment( # type: ignore + name=experiment.name, + run=run, + options=experiment.options, + deps=task_deps, + ) + relative_experiment_identifiers.append(":" + experiment.name) + + except TypeError as ex: + raise ExperimentGroupInvalidExperimentInstance(task_name=name) from ex + + # combine(): Defined by Conductor at runtime + # pylint: disable=undefined-variable + combine( # type: ignore + name=name, + deps=relative_experiment_identifiers, + ) diff --git a/tests/fixture-projects/experiments/invalid-group-duplicate/COND b/tests/fixture-projects/experiments/invalid-group-duplicate/COND new file mode 100644 index 0000000..173f941 --- /dev/null +++ b/tests/fixture-projects/experiments/invalid-group-duplicate/COND @@ -0,0 +1,8 @@ +run_experiment_group( + name="test", + run="exit 0", + experiments=[ + ExperimentInstance(name="test-inst", options={}), + ExperimentInstance(name="test-inst", options={}), + ] +) diff --git a/tests/fixture-projects/experiments/invalid-group-type/COND b/tests/fixture-projects/experiments/invalid-group-type/COND new file mode 100644 index 0000000..dfe198d --- /dev/null +++ b/tests/fixture-projects/experiments/invalid-group-type/COND @@ -0,0 +1,8 @@ +run_experiment_group( + name="test", + run="exit 0", + experiments=[ + {"name": "test-inst", "options": {}}, + {"name": "test-inst-2", "options": {}}, + ] +) diff --git a/tests/fixture-projects/experiments/sweep/COND b/tests/fixture-projects/experiments/sweep/COND new file mode 100644 index 0000000..d1dd17b --- /dev/null +++ b/tests/fixture-projects/experiments/sweep/COND @@ -0,0 +1,11 @@ +run_experiment_group( + name="threads", + run="./run.sh", + experiments=[ + ExperimentInstance( + name="threads-{}".format(threads), + options={"threads": threads}, + ) + for threads in range(1, 5) + ], +) diff --git a/tests/fixture-projects/experiments/sweep/run.sh b/tests/fixture-projects/experiments/sweep/run.sh new file mode 100755 index 0000000..54066e7 --- /dev/null +++ b/tests/fixture-projects/experiments/sweep/run.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +echo $1 > ${COND_OUT}/output.txt diff --git a/tests/run_experiment_group_test.py b/tests/run_experiment_group_test.py new file mode 100644 index 0000000..5092da8 --- /dev/null +++ b/tests/run_experiment_group_test.py @@ -0,0 +1,46 @@ +import pathlib +from conductor.config import TASK_OUTPUT_DIR_SUFFIX +from .conductor_runner import ( + ConductorRunner, + FIXTURE_TEMPLATES, +) + + +def test_run_experiment_group(tmp_path: pathlib.Path): + cond = ConductorRunner.from_template(tmp_path, FIXTURE_TEMPLATES["experiments"]) + result = cond.run("//sweep:threads") + assert result.returncode == 0 + assert cond.output_path.is_dir() + + combined_dir = pathlib.Path( + cond.output_path, "sweep", ("threads" + TASK_OUTPUT_DIR_SUFFIX) + ) + assert combined_dir.is_dir() + + expected_tasks = ["threads-{}".format(threads) for threads in range(1, 5)] + + # Ensure combined task dirs all exist and are non-empty. + combined_dir_names = [path.name for path in combined_dir.iterdir()] + for task_name in expected_tasks: + assert task_name in combined_dir_names + assert any(True for _ in (combined_dir / task_name).iterdir()) + + # Ensure individual experiment dirs also exist. + sweep_output = combined_dir.parent + assert sweep_output.is_dir() + sweep_output_count = len(list(sweep_output.iterdir())) + + # 4 experiment instances plus the combined output dir. + assert sweep_output_count == 5 + + +def test_run_experiment_group_invalid_duplicate(tmp_path: pathlib.Path): + cond = ConductorRunner.from_template(tmp_path, FIXTURE_TEMPLATES["experiments"]) + result = cond.run("//invalid-group-duplicate:test") + assert result.returncode != 0 + + +def test_run_experiment_group_invalid_type(tmp_path: pathlib.Path): + cond = ConductorRunner.from_template(tmp_path, FIXTURE_TEMPLATES["experiments"]) + result = cond.run("//invalid-group-type:test") + assert result.returncode != 0