Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAG-based mapping and execution #31

Merged
merged 57 commits into from
Jul 19, 2023
Merged

DAG-based mapping and execution #31

merged 57 commits into from
Jul 19, 2023

Conversation

WarmCyan
Copy link
Collaborator

Closes #29

The intent of this PR is to make curifactory primarily determine need-to-execute for each stage based on the DAG and where that stage's outputs are needed (based on cache status of later stage outputs and overwrite conditions etc.)

This allows skipping loading an artifact into memory if it won't ever be needed by an executed stage

@WarmCyan
Copy link
Collaborator Author

WarmCyan commented Apr 5, 2023

Unclear on best terminology for the "soft inputs" that aggregate stages can now take. Currently it's expected_state. Something like inputs would make it consistent with a regular stage, but since it's not actually handled the same way (since they're not directly passed in to the function), it seems odd to call it the same. Alternatively, we could make it so that 'inputs' really do get passed (as lists of values or as a list of tuples representing each 'call') into the function itself

@WarmCyan
Copy link
Collaborator Author

WarmCyan commented Apr 6, 2023

The basic concepts have been implemented - the idea is that the full run_experiment code is run in map_mode as a form of forward-pass, meaning every stage short-circuits before running or loading anything (essentially just collecting info about the stage and building up 'pseudo-records'). After this forward pass completes, the manager populates the DAG with a copy of these records and all of the pseudo artifacts (a modified ArtifactRepresentation which will load metadata if the underlying artifact was found/cached etc.)

The DAG then analyzes all the records and artifact metadata to gather execution trees for each leaf stage (stage whose outputs are not used as inputs in any other stage throughout the experiment), and then looks through each tree and determines which stages need to execute based on cache and overwrite status (specifying --overwrite-stage will now also automatically apply to impacted later stages.)

This allows looking at the "execution plan" of an experiment without actually running it, e.g. with the new --map-only flag on the CLI, which will tell you what values are cached, which run they were cached from, and the list of stages that will actually be executed.

An example is below, this is the newsgroups example experiment showing that a couple of the records' parametersets had been run previously (newsgroups_210 and newsgroups_212), so those values will be re-used from cache.

image

Farther up (this output will be cleaned up at some point), the full execution tree is shown, which is a rough tree-view of each stage and it's indented dependency stages. Following that is the execution list which is a list of tuples of combination record indices and stage names, representing the stages that should actually execute during the experiment run.

image

Something this should allow - we might be able to automatically unload values from state (conditioned on this not being in interactive mode, and prob some other flag that specifies no_unload or something - by checking the dag's is_output_used_anywhere at the end of a stage, and deleting from state if not.

An interesting side effect and improvement from adding the explicit expected_state is that stage maps now directly show which outputs go into an aggregate stage: (rather than previously, which would just show an arrow from the entire record to the aggregate stage.)
image

There are cases where the same stage might get called multiple times
with the same arguments - the DAG will only see that it doesn't have
the cached values at first, and since it doesn't update as the
experiment is running, it will think every subsequent call of that stage
will still need to execute. By adding in the cache check even when
the DAG things the stage should execute, we ensure that it can still use
cached values from earlier in the experiment and skip execution.
It seems that if the root logger has a level set to INFO and there are
no handlers, it will default to using a handler anyway. I resolved this
by setting the logging level to error if quiet is set.

Fixes #33
@WarmCyan WarmCyan marked this pull request as ready for review July 7, 2023 17:48
Copy link
Collaborator

@jasonmhite jasonmhite left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nathan and I discussed briefly at a high level and this approach seems sound to me.

Copy link
Member

@stewartsl stewartsl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no problems with the overall approach.

Copy link
Collaborator

@mbadams5 mbadams5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just a few minor comments/questions and some code suggestions.

curifactory/dag.py Outdated Show resolved Hide resolved
curifactory/dag.py Show resolved Hide resolved
curifactory/dag.py Show resolved Hide resolved
curifactory/dag.py Show resolved Hide resolved
curifactory/dag.py Show resolved Hide resolved
curifactory/staging.py Outdated Show resolved Hide resolved
curifactory/staging.py Outdated Show resolved Hide resolved
curifactory/staging.py Outdated Show resolved Hide resolved
curifactory/staging.py Outdated Show resolved Hide resolved
curifactory/staging.py Outdated Show resolved Hide resolved
@WarmCyan
Copy link
Collaborator Author

After discussion with Mark, we figured that a better way of handling expected_state for @aggregate would be to make it more consistent with @stage, naming it inputs and actually passing in the associated state where possible into the function itself as an argument.

The idea here would be to pass in the state objects via dictionaries that are keyed by the associated records in the passed records list. Records that don't have the specified name in state would simply not be part of the argument-specific dictionary.

As an example, where a previous aggregate stage might have looked like:

@aggregate(outputs=["final_results"])
def combine_results(record: Record, records: list[Record]):
    final_results = {}
    for record in records:
        if "results" in record.state:
            final_results[record.args.name] = record.state["results"]
    return final_results

could now become:

@aggregate(inputs=["results"], outputs=["final_results"])
def combine_results(record: Record, records: list[Record], results: dict[Record, float]):
    final_results = {}
    for record, result in results.items():
        final_results[record.args.name] = result
    return final_results

This eliminates the need to directly access state on records and simplifies having to check whether a given state has the artifact at all, while retaining the benefits above from adding in an expected_state/inputs

@WarmCyan WarmCyan merged commit 7b0a320 into main Jul 19, 2023
2 checks passed
@WarmCyan WarmCyan deleted the dag branch July 19, 2023 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DAG-based execution
4 participants