Skip to content

Commit

Permalink
Merge pull request #46 from CMIP-REF/local-output-dir-clean
Browse files Browse the repository at this point in the history
  • Loading branch information
lewisjared authored Jan 7, 2025
2 parents 35c4afe + 9f7a917 commit ff7f66d
Show file tree
Hide file tree
Showing 18 changed files with 292 additions and 69 deletions.
4 changes: 4 additions & 0 deletions changelog/46.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Support the option for different assumptions about the root paths between executors and the ref CLI.

Where possible path fragments are stored in the database instead of complete paths.
This allows the ability to move the data folders without needing to update the database.
17 changes: 10 additions & 7 deletions docs/how-to-guides/running-metrics-locally.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
# This guide will walk you through how to run a metric provider locally.


# %%
# !pip install prettyprinter

# %% tags=["remove_input"]
import json
from pathlib import Path
Expand All @@ -35,6 +32,7 @@
import pandas as pd
import prettyprinter
import ref_metrics_example
from attr import evolve
from ref_core.datasets import SourceDatasetType
from ref_core.executor import run_metric

Expand Down Expand Up @@ -138,12 +136,13 @@
# and which datasets should be used for the metric calculation.

# %%
output_directory = Path("out")
definition = metric_executions[0].build_metric_execution_info()
prettyprinter.pprint(definition)

# %%
# Update the output fragment to be a subdirectory of the current working directory
definition = attrs.evolve(definition, output_fragment=Path("out") / definition.output_fragment)
definition = attrs.evolve(definition, output_fragment=output_directory / definition.output_fragment)
definition.output_fragment

# %% [markdown]
Expand All @@ -163,7 +162,9 @@
result

# %%
with open(result.output_bundle) as fh:

output_file = result.definition.to_output_path(result.bundle_filename)
with open(output_file) as fh:
# Load the output bundle and pretty print
loaded_result = json.loads(fh.read())
print(json.dumps(loaded_result, indent=2))
Expand All @@ -175,7 +176,9 @@
# This will not perform and validation/verification of the output results.

# %%
direct_result = metric.run(definition=definition)
direct_result = metric.run(definition=evolve(definition, output_directory=output_directory))
assert direct_result.successful

direct_result
prettyprinter.pprint(direct_result)

# %%
22 changes: 18 additions & 4 deletions packages/ref-core/src/ref_core/executor/local.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from attrs import evolve
from loguru import logger

from ref.config import Config
from ref_core.metrics import Metric, MetricExecutionDefinition, MetricResult


Expand All @@ -12,6 +16,9 @@ class LocalExecutor:

name = "local"

def __init__(self, config: Config | None = None):
self.config = Config.default() if config is None else config

def run_metric(self, metric: Metric, definition: MetricExecutionDefinition) -> MetricResult:
"""
Run a metric in process
Expand All @@ -28,7 +35,14 @@ def run_metric(self, metric: Metric, definition: MetricExecutionDefinition) -> M
:
Results from running the metric
"""
# TODO: Update fragment use the output directory which may vary depending on the executor
definition.output_fragment.mkdir(parents=True, exist_ok=True)

return metric.run(definition=definition)
# TODO: This should be changed to use executor specific configuration
definition = evolve(definition, output_directory=self.config.paths.tmp)
execution_output_path = definition.to_output_path(filename=None)
execution_output_path.mkdir(parents=True, exist_ok=True)

try:
return metric.run(definition=definition)
# TODO: Copy results to the output directory
except Exception:
logger.exception(f"Error running metric {metric.slug}")
return MetricResult.build_from_failure(definition)
103 changes: 88 additions & 15 deletions packages/ref-core/src/ref_core/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ class MetricExecutionDefinition:
This represents the information needed by a metric to perform a single execution of the metric
"""

output_fragment: pathlib.Path
"""
Relative directory to store the output of the metric execution
This is relative to the temporary directory which may differ by executor.
"""

key: str
"""
A unique identifier for the metric execution
Expand All @@ -37,6 +30,43 @@ class MetricExecutionDefinition:
Collection of datasets required for the metric execution
"""

output_fragment: pathlib.Path
"""
Relative directory to store the output of the metric execution
This is relative to the temporary directory which may differ by executor.
"""

output_directory: pathlib.Path | None = None
"""
Root directory for output data
This will be resolved by the executor as the output directory may vary depending on where
the executor is being run.
"""

def to_output_path(self, filename: str | None) -> pathlib.Path:
"""
Get the absolute path for a file in the output directory
Parameters
----------
filename
Name of the file to get the full path for
Returns
-------
:
Full path to the file in the output directory
"""
if self.output_directory is None:
raise AssertionError("Output directory is not set") # pragma: no cover

if filename is None:
return self.output_directory / self.output_fragment
else:
return self.output_directory / self.output_fragment / filename


@frozen
class MetricResult:
Expand All @@ -49,28 +79,39 @@ class MetricResult:

# Do we want to load a serialised version of the output bundle here or just a file path?

output_bundle: pathlib.Path | None
definition: MetricExecutionDefinition
"""
The definition of the metric execution that produced this result.
"""
Path to the output bundle file.

bundle_filename: pathlib.Path | None
"""
Filename of the output bundle file relative to the execution directory.
The absolute path of the outputs may differ between executors
depending on where the output directory is mounted.
The contents of this file are defined by
[EMDS standard](https://github.com/Earth-System-Diagnostics-Standards/EMDS/blob/main/standards.md#common-output-bundle-format-)
"""

successful: bool
"""
Whether the metric ran successfully.
"""
# Log info is in the output bundle file already, but is definitely useful

@staticmethod
def build(configuration: MetricExecutionDefinition, cmec_output_bundle: dict[str, Any]) -> "MetricResult":
def build_from_output_bundle(
definition: MetricExecutionDefinition, cmec_output_bundle: dict[str, Any]
) -> "MetricResult":
"""
Build a MetricResult from a CMEC output bundle.
Parameters
----------
configuration
The configuration used to run the metric.
definition
The execution defintion.
cmec_output_bundle
An output bundle in the CMEC format.
Expand All @@ -82,13 +123,45 @@ def build(configuration: MetricExecutionDefinition, cmec_output_bundle: dict[str
A prepared MetricResult object.
The output bundle will be written to the output directory.
"""
with open(configuration.output_fragment / "output.json", "w") as file_handle:
definition.to_output_path(filename=None).mkdir(parents=True, exist_ok=True)
bundle_path = definition.to_output_path("output.json")

with open(bundle_path, "w") as file_handle:
json.dump(cmec_output_bundle, file_handle)
return MetricResult(
output_bundle=configuration.output_fragment / "output.json",
definition=definition,
bundle_filename=pathlib.Path("output.json"),
successful=True,
)

@staticmethod
def build_from_failure(definition: MetricExecutionDefinition) -> "MetricResult":
"""
Build a failed metric result.
This is a placeholder.
Additional log information should still be captured in the output bundle.
"""
return MetricResult(bundle_filename=None, successful=False, definition=definition)

def to_output_path(self, filename: str | None) -> pathlib.Path:
"""
Get the absolute path for a file in the output directory
Parameters
----------
filename
Name of the file to get the full path for
If None the path to the output bundle will be returned
Returns
-------
:
Full path to the file in the output directory
"""
return self.definition.to_output_path(filename)


@frozen(hash=True)
class DataRequirement:
Expand Down Expand Up @@ -122,7 +195,7 @@ class DataRequirement:
"""
The fields to group the datasets by.
This groupby operation is performed after the data catalog is filtered according to `filters`.
This group by operation is performed after the data catalog is filtered according to `filters`.
Each group will contain a unique combination of values from the metadata fields,
and will result in a separate execution of the metric.
If `group_by=None`, all datasets will be processed together as a single execution.
Expand Down
8 changes: 4 additions & 4 deletions packages/ref-core/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ def __init__(self, temp_dir: pathlib.Path) -> None:
data_requirements = (DataRequirement(source_type=SourceDatasetType.CMIP6, filters=(), group_by=None),)

def run(self, definition: MetricExecutionDefinition) -> MetricResult:
# TODO: This doesn't write output.json, use build function?
return MetricResult(
output_bundle=self.temp_dir / definition.output_fragment / "output.json",
bundle_filename=self.temp_dir / definition.output_fragment / "output.json",
successful=True,
definition=definition,
)


Expand All @@ -30,9 +32,7 @@ class FailedMetric:
data_requirements = (DataRequirement(source_type=SourceDatasetType.CMIP6, filters=(), group_by=None),)

def run(self, definition: MetricExecutionDefinition) -> MetricResult:
return MetricResult(
successful=False,
)
return MetricResult.build_from_failure(definition)


@pytest.fixture
Expand Down
11 changes: 10 additions & 1 deletion packages/ref-core/tests/unit/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,16 @@ def test_run_metric(self, metric_definition, mock_metric):

result = executor.run_metric(mock_metric, metric_definition)
assert result.successful
assert result.output_bundle == metric_definition.output_fragment / "output.json"
assert result.bundle_filename == metric_definition.output_fragment / "output.json"

def test_raises_exception(self, metric_definition, mock_metric):
executor = LocalExecutor()

mock_metric.run = lambda definition: 1 / 0

result = executor.run_metric(mock_metric, metric_definition)
assert result.successful is False
assert result.bundle_filename is None


@pytest.mark.parametrize("executor_name", ["local", None])
Expand Down
32 changes: 25 additions & 7 deletions packages/ref-core/tests/unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,42 @@

import pandas as pd
import pytest
from attr import evolve
from ref_core.datasets import FacetFilter, SourceDatasetType
from ref_core.metrics import DataRequirement, MetricExecutionDefinition, MetricResult


class TestMetricResult:
def test_build(self, tmp_path):
config = MetricExecutionDefinition(
def test_build_from_output_bundle(self, tmp_path):
definition = MetricExecutionDefinition(
output_fragment=tmp_path, key="mocked-metric-slug", metric_dataset=None
)
result = MetricResult.build(config, {"data": "value"})
# Setting the output directory generally happens as a side effect of the executor
definition = evolve(definition, output_directory=tmp_path)

result = MetricResult.build_from_output_bundle(definition, {"data": "value"})

assert result.successful
assert result.output_bundle.exists()
assert result.output_bundle.is_file()
with open(result.output_bundle) as f:

# Convert relative path to absolute path
output_filename = result.to_output_path(result.bundle_filename)

assert output_filename.exists()
assert output_filename.is_file()
with open(output_filename) as f:
assert f.read() == '{"data": "value"}'

assert result.output_bundle.is_relative_to(tmp_path)
assert output_filename.is_relative_to(tmp_path)

def test_build_from_failure(self):
definition = MetricExecutionDefinition(
output_fragment="output", key="mocked-metric-slug", metric_dataset=None
)
result = MetricResult.build_from_failure(definition)

assert not result.successful
assert result.bundle_filename is None
assert result.definition == definition


@pytest.fixture
Expand Down
5 changes: 0 additions & 5 deletions packages/ref-metrics-esmvaltool/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ dependencies = [
[project.license]
text = "Apache-2.0"

[tool.uv]
dev-dependencies = [
"pytest-mock >= 3.12",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,6 @@ def run(self, definition: MetricExecutionDefinition) -> MetricResult:
result = next(result_dir.glob("work/timeseries/script1/*.nc"))
annual_mean_global_mean_timeseries = xarray.open_dataset(result)

return MetricResult.build(definition, format_cmec_output_bundle(annual_mean_global_mean_timeseries))
return MetricResult.build_from_output_bundle(
definition, format_cmec_output_bundle(annual_mean_global_mean_timeseries)
)
Loading

0 comments on commit ff7f66d

Please sign in to comment.