Skip to content

Commit

Permalink
Merge pull request #29 from CMIP-REF/solve
Browse files Browse the repository at this point in the history
  • Loading branch information
lewisjared authored Dec 4, 2024
2 parents 931e230 + 31c68a9 commit 5873ac9
Show file tree
Hide file tree
Showing 25 changed files with 650 additions and 300 deletions.
5 changes: 5 additions & 0 deletions changelog/29.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Translated selected groups of datasets into `MetricDataset`s.
Each `MetricDataset` contains all of the dataset's needed for a given execution of a metric.

Added a slug to the `MetricDataset` to uniquely identify the execution
and make it easier to identify the execution in the logs.
9 changes: 9 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@
from pathlib import Path

import esgpull
import pandas as pd
import pytest

from ref.datasets.cmip6 import CMIP6DatasetAdapter


@pytest.fixture
def esgf_data_dir() -> Path:
pull = esgpull.Esgpull()

return pull.config.paths.data


@pytest.fixture
def cmip6_data_catalog(esgf_data_dir) -> pd.DataFrame:
adapter = CMIP6DatasetAdapter()
return adapter.find_local_datasets(esgf_data_dir)
135 changes: 110 additions & 25 deletions docs/how-to-guides/running-metrics-locally.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,134 @@
# # Testing metric providers locally
# Metric providers can be run locally without requiring the rest of the REF infrastructure.
# This is useful for testing and debugging metrics.

#
# Running a metric locally requires that the target REF metrics package, e.g. `ref-metrics-example`,
# and its dependencies are installed in the current Python environment.
#
# This guide will walk you through how to run a metric provider locally.


# %%
# !pip install prettyprinter

# %% tags=["remove_input"]
import json
import pathlib
from pathlib import Path

import attrs
import pandas as pd
import prettyprinter
import ref_metrics_example
from ref_core.datasets import SourceDatasetType
from ref_core.executor import run_metric
from ref_core.metrics import Configuration, TriggerInfo

from ref.env import env
from ref.cli.config import load_config
from ref.database import Database
from ref.datasets import get_dataset_adapter
from ref.provider_registry import ProviderRegistry
from ref.solver import MetricSolver

prettyprinter.install_extras(["attrs"])

# %%
provider = ref_metrics_example.provider
provider

# %% [markdown]
# We select a metric which simply calculates the annual mean, global mean timeseries of a dataset.
#
# The data requirements of this metric, filter out all variables except `tas` and `rsut`.
# The `group_by` specification ensures that each execution has a unique combination of
# `source_id`, `variant_id`, `variable_id` and `experiment_id` values.

# %%
metric = provider.get("global-mean-timeseries")

# %%
prettyprinter.pprint(metric.data_requirements[0])

# %% tags=["hide_code"]
config = load_config()
db = Database.from_config(config)

# %% [markdown]
# Load the data catalog containing the CMIP6 datasets.
# This contains the datasets that have been ingested into the REF database.
# You could also use the `find_local_datasets` function to find local datasets on disk,
# thereby bypassing the need for a database.

# %%
# Load the data catalog containing the
data_catalog = get_dataset_adapter("cmip6").load_catalog(db)
data_catalog.head()

# %% [markdown]
# Below the unique combinations of the metadata values that apply to the groupby are shown:

# %%
data_catalog[["source_id", "variant_label", "variable_id", "experiment_id"]].drop_duplicates()

# %% [markdown]
#
# ## Metric Executions
#
# A metric execution is a combination of a metric, a provider, and the data needed to run the metric.
#
# The `MetricSolver` is used to determine which metric executions are required given a set of requirements
# and the currently available dataset.
# This doesn't require the use of the REF database.

# %%
# Relative path to some CMIP6 data
example_dataset = (
pathlib.Path(env.path("REF_DATA_ROOT"))
/ "CMIP6"
/ "ScenarioMIP"
/ "CSIRO"
/ "ACCESS-ESM1-5"
/ "ssp126"
/ "r1i1p1f1"
/ "Amon"
/ "tas"
/ "gn"
/ "v20210318"
solver = MetricSolver(
provider_registry=ProviderRegistry(provider),
data_catalog={
SourceDatasetType.CMIP6: data_catalog,
},
)

metric_executions = solver.solve_metric_executions(
metric=provider.get("global-mean-timeseries"), provider=provider
)

# Convert from a generator to a list to inspect the complete set of results
metric_executions = list(metric_executions)
prettyprinter.pprint(metric_executions)

# %% [markdown]
# We get multiple proposed executions.

# %%
configuration = Configuration(output_directory=pathlib.Path("out") / "example" / "example")
trigger = TriggerInfo(dataset=example_dataset)
pd.concat(execution.metric_dataset["cmip6"] for execution in metric_executions)[
["experiment_id", "variable_id"]
].drop_duplicates()

configuration.output_directory.mkdir(exist_ok=True, parents=True)
# %% [markdown]
# Each execution contains a single unique dataset because of the groupby definition.
# The data catalog for the metric execution may contain more than one row
# as a dataset may contain multiple files.

# %%
metric_executions[0].metric_dataset["cmip6"].instance_id.unique().tolist()

# %%
metric_executions[0].metric_dataset["cmip6"]

# %% [markdown]
#
# ## Metric Definitions
#
# Each metric execution requires a `MetricExecutionDefinition` object.
# This object contains the information about where data should be stored
# and which datasets should be used for the metric calculation.

# %%
definition = metric_executions[0].build_metric_execution_info()
prettyprinter.pprint(definition)

# %%
# Update the output fragment to be a subdirectory of the current working directory
definition = attrs.evolve(definition, output_fragment=Path("out") / definition.output_fragment)
definition.output_fragment

# %% [markdown]
# ## Metric calculations
Expand All @@ -68,11 +155,11 @@
# The simplest executor is the `LocalExecutor`.
# This executor runs a given metric synchronously in the current process.
#
# The LocalExectuor is the default executor when using the `ref_core.executor.run_metric` function.
# The `LocalExecutor` is the default executor when using the `ref_core.executor.run_metric` function.
# This can be overridden by specifying the `REF_EXECUTOR` environment variable.

# %%
result = run_metric("global_mean_timeseries", provider, configuration=configuration, trigger=trigger)
result = run_metric("global-mean-timeseries", provider, definition=definition)
result

# %%
Expand All @@ -88,9 +175,7 @@
# This will not perform and validation/verification of the output results.

# %%
metric = provider.get("global_mean_timeseries")

direct_result = metric.run(configuration=configuration, trigger=trigger)
direct_result = metric.run(definition=definition)
assert direct_result.successful

direct_result
66 changes: 66 additions & 0 deletions packages/ref-core/src/ref_core/datasets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import enum
import hashlib
from typing import Any

import pandas as pd
from attrs import field, frozen


Expand Down Expand Up @@ -48,3 +51,66 @@ class FacetFilter:
If true (default), datasets that match the filter will be kept else they will be removed.
"""


@frozen
class DatasetCollection:
"""
Group of datasets required for a given metric execution for a specific source dataset type.
"""

datasets: pd.DataFrame
slug_column: str

def __getattr__(self, item: str) -> Any:
return getattr(self.datasets, item)

def __getitem__(self, item: str) -> Any:
return self.datasets[item]

def __hash__(self) -> int:
# This hashes each item individually and sums them so order doesn't matter
return int(pd.util.hash_pandas_object(self.datasets[self.slug_column]).sum())

def __eq__(self, other: object) -> bool:
return self.__hash__() == other.__hash__()


class MetricDataset:
"""
The complete set of datasets required for a metric execution.
This may cover multiple source dataset types.
"""

def __init__(self, collection: dict[SourceDatasetType | str, DatasetCollection]):
self._collection = collection

def __getitem__(self, key: SourceDatasetType | str) -> DatasetCollection:
if isinstance(key, str):
key = SourceDatasetType(key)
return self._collection[key]

def __hash__(self) -> int:
return hash(self.slug)

@property
def slug(self) -> str:
"""
Unique identifier for the collection
A SHA1 hash is calculated of the combination of the hashes of the individual collections.
The value isn't reversible but can be used to uniquely identify the aggregate of the
collections.
Returns
-------
:
SHA1 hash of the collections
"""
# The dataset collection hashes are reproducible,
# so we can use them to hash the metric dataset.
# This isn't explicitly true for all Python hashes
hash_sum = sum(hash(item) for item in self._collection.values())
hash_bytes = hash_sum.to_bytes(16, "little", signed=True)
return hashlib.sha1(hash_bytes).hexdigest() # noqa: S324
4 changes: 2 additions & 2 deletions packages/ref-core/src/ref_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class RefException(Exception):
class InvalidMetricException(RefException):
"""Exception raised when an invalid metric is registered"""

def __init__(self, metric: Any) -> None:
message = f"Invalid metric: '{metric}'\n" "Metrics must be an instance of the 'Metric' class"
def __init__(self, metric: Any, message: str) -> None:
message = f"Invalid metric: '{metric}'\n {message}"

super().__init__(message)

Expand Down
31 changes: 15 additions & 16 deletions packages/ref-core/src/ref_core/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
"""

import os
from typing import Any, Protocol, runtime_checkable
from typing import Protocol, runtime_checkable

from ref_core.executor.local import LocalExecutor
from ref_core.metrics import Configuration, Metric, MetricResult, TriggerInfo
from ref_core.metrics import Metric, MetricExecutionDefinition, MetricResult
from ref_core.providers import MetricsProvider


Expand All @@ -35,24 +35,18 @@ class Executor(Protocol):

name: str

def run_metric(
self, metric: Metric, configuration: Configuration, trigger: TriggerInfo | None, **kwargs: Any
) -> MetricResult:
def run_metric(self, metric: Metric, definition: MetricExecutionDefinition) -> MetricResult:
"""
Execute a metric
Parameters
----------
metric
Metric to run
configuration
Configuration to run the metric with
trigger
Information about the dataset that triggered the metric run
definition
Definition of the information needed to execute a metric
TODO: The optionality of this parameter is a placeholder and will be expanded in the future.
kwargs
Additional keyword arguments for the executor
This
Returns
-------
Expand Down Expand Up @@ -115,7 +109,9 @@ def get(self, name: str) -> Executor:
get_executor = _default_manager.get


def run_metric(metric_name: str, /, metrics_provider: MetricsProvider, **kwargs: Any) -> MetricResult:
def run_metric(
metric_name: str, /, metrics_provider: MetricsProvider, definition: MetricExecutionDefinition
) -> MetricResult:
"""
Run a metric using the default executor
Expand All @@ -130,8 +126,11 @@ def run_metric(metric_name: str, /, metrics_provider: MetricsProvider, **kwargs:
Name of the metric to run.
metrics_provider
Provider from where to retrieve the metric
kwargs
Additional options passed to the metric executor
definition
Information that describes a given metric execution.
This includes the datasets that are needed to run the metric,
where the output should be stored, and any other information needed to run the metric.
Returns
-------
Expand All @@ -143,7 +142,7 @@ def run_metric(metric_name: str, /, metrics_provider: MetricsProvider, **kwargs:
executor = get_executor(executor_name)
metric = metrics_provider.get(metric_name)

result = executor.run_metric(metric, **kwargs)
result = executor.run_metric(metric, definition)

# TODO: Validate the result
# TODO: Log the result
Expand Down
Loading

0 comments on commit 5873ac9

Please sign in to comment.