Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metric and provider tables #31

Merged
merged 16 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/31.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add database structures to represent a metric execution.
We record previous executions of a metric to minimise the number of times we need to run metrics.
18 changes: 18 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas as pd
import pytest

from ref.config import Config
from ref.datasets.cmip6 import CMIP6DatasetAdapter


Expand All @@ -24,3 +25,20 @@ def esgf_data_dir() -> Path:
def cmip6_data_catalog(esgf_data_dir) -> pd.DataFrame:
adapter = CMIP6DatasetAdapter()
return adapter.find_local_datasets(esgf_data_dir)


@pytest.fixture(autouse=True)
def config(tmp_path, monkeypatch) -> Config:
monkeypatch.setenv("REF_CONFIGURATION", str(tmp_path / "ref"))

# Uses the default configuration
cfg = Config.load(tmp_path / "ref" / "ref.toml")

# Allow adding datasets from outside the tree for testing
cfg.paths.allow_out_of_tree_datasets = True

# Use a SQLite in-memory database for testing
# cfg.db.database_url = "sqlite:///:memory:"
cfg.save()

return cfg
9 changes: 6 additions & 3 deletions packages/ref-core/src/ref_core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ class DatasetCollection:

datasets: pd.DataFrame
slug_column: str
"""
Column in datasets that contains the unique identifier for the dataset
"""

def __getattr__(self, item: str) -> Any:
return getattr(self.datasets, item)

def __getitem__(self, item: str) -> Any:
def __getitem__(self, item: str | list[str]) -> Any:
return self.datasets[item]

def __hash__(self) -> int:
Expand All @@ -92,10 +95,10 @@ def __getitem__(self, key: SourceDatasetType | str) -> DatasetCollection:
return self._collection[key]

def __hash__(self) -> int:
return hash(self.slug)
return hash(self.hash)

@property
def slug(self) -> str:
def hash(self) -> str:
"""
Unique identifier for the collection

Expand Down
8 changes: 7 additions & 1 deletion packages/ref-core/src/ref_core/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ class MetricExecutionDefinition:
This is relative to the temporary directory which may differ by executor.
"""

slug: str
key: str
"""
A unique identifier for the metric execution

The key is a hash of the group by values for the datasets used in the metric execution.
Duplicate keys will occur when new datasets are available that match the same group by values.
"""

metric_dataset: MetricDataset
Expand Down Expand Up @@ -123,6 +126,9 @@ class DataRequirement:
Each group will contain a unique combination of values from the metadata fields,
and will result in a separate execution of the metric.
If `group_by=None`, all datasets will be processed together as a single execution.

The unique values of the group by fields are used to create a unique key for the metric execution.
Changing the value of `group_by` may invalidate all previous metric executions.
"""

constraints: tuple[GroupConstraint, ...] = field(factory=tuple)
Expand Down
2 changes: 1 addition & 1 deletion packages/ref-core/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def mock_metric(tmp_path) -> MockMetric:
@pytest.fixture
def metric_definition(tmp_path) -> MetricExecutionDefinition:
return MetricExecutionDefinition(
output_fragment=tmp_path, slug="mocked-metric-slug", metric_dataset=MetricDataset({})
output_fragment=tmp_path, key="mocked-metric-slug", metric_dataset=MetricDataset({})
)
2 changes: 1 addition & 1 deletion packages/ref-core/tests/unit/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_hash(self, metric_dataset, cmip6_data_catalog):
)

def test_slug(self, metric_dataset):
assert metric_dataset.slug == "69165f25de10cc0b682e6d1acd1026c0ad27448c"
assert metric_dataset.hash == "69165f25de10cc0b682e6d1acd1026c0ad27448c"


class TestDatasetCollection:
Expand Down
2 changes: 1 addition & 1 deletion packages/ref-core/tests/unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class TestMetricResult:
def test_build(self, tmp_path):
config = MetricExecutionDefinition(
output_fragment=tmp_path, slug="mocked-metric-slug", metric_dataset=None
output_fragment=tmp_path, key="mocked-metric-slug", metric_dataset=None
)
result = MetricResult.build(config, {"data": "value"})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def format_cmec_output_bundle(dataset: xr.Dataset) -> dict[str, Any]:
"version": "v1",
},
"RESULTS": {
dataset.attrs["source_id"]: {"global": {"tas": ""}},
dataset.attrs["source_id"]: {"global": {"tas": 0}},
},
}

Expand Down
2 changes: 1 addition & 1 deletion packages/ref-metrics-example/tests/unit/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_example_metric(tmp_path, metric_dataset, cmip6_data_catalog):

configuration = MetricExecutionDefinition(
output_fragment=tmp_path,
slug="global_mean_timeseries",
key="global_mean_timeseries",
metric_dataset=MetricDataset(
{
SourceDatasetType.CMIP6: DatasetCollection(ds, "instance_id"),
Expand Down
121 changes: 121 additions & 0 deletions packages/ref/alembic/versions/4b95a617184e_add_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""add_metrics

Revision ID: 4b95a617184e
Revises: ea2aa1134cb3
Create Date: 2024-12-05 22:10:07.971615

"""

from collections.abc import Sequence
from typing import Union

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "4b95a617184e"
down_revision: Union[str, None] = "ea2aa1134cb3"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"provider",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("slug", sa.String(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("version", sa.String(), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("slug"),
)
op.create_table(
"metric",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("slug", sa.String(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("provider_id", sa.Integer(), nullable=False),
sa.Column("enabled", sa.Boolean(), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.ForeignKeyConstraint(
["provider_id"],
["provider.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("provider_id", "slug", name="metric_ident"),
sa.UniqueConstraint("slug"),
)
op.create_table(
"metric_execution",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("retracted", sa.Boolean(), nullable=False),
sa.Column("metric_id", sa.Integer(), nullable=False),
sa.Column("key", sa.String(), nullable=False),
sa.Column("dirty", sa.Boolean(), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.ForeignKeyConstraint(
["metric_id"],
["metric.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("metric_id", "key", name="metric_execution_ident"),
)
with op.batch_alter_table("metric_execution", schema=None) as batch_op:
batch_op.create_index(batch_op.f("ix_metric_execution_key"), ["key"], unique=False)

op.create_table(
"metric_execution_result",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("metric_execution_id", sa.Integer(), nullable=False),
sa.Column("dataset_hash", sa.String(), nullable=False),
sa.Column("successful", sa.Boolean(), nullable=True),
sa.Column("path", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("(CURRENT_TIMESTAMP)"), nullable=False),
sa.ForeignKeyConstraint(
["metric_execution_id"],
["metric_execution.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("metric_execution_id", "dataset_hash", name="metric_execution_result_ident"),
)
with op.batch_alter_table("metric_execution_result", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_metric_execution_result_dataset_hash"), ["dataset_hash"], unique=False
)

op.create_table(
"metric_execution_result_dataset",
sa.Column("metric_execution_result_id", sa.Integer(), nullable=True),
sa.Column("dataset_id", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["dataset_id"],
["dataset.id"],
),
sa.ForeignKeyConstraint(
["metric_execution_result_id"],
["metric_execution_result.id"],
),
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("metric_execution_result_dataset")
with op.batch_alter_table("metric_execution_result", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_metric_execution_result_dataset_hash"))

op.drop_table("metric_execution_result")
with op.batch_alter_table("metric_execution", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_metric_execution_key"))

op.drop_table("metric_execution")
op.drop_table("metric")
op.drop_table("provider")
# ### end Alembic commands ###
24 changes: 3 additions & 21 deletions packages/ref/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pytest_regressions.data_regression import RegressionYamlDumper
from yaml.representer import SafeRepresenter

from ref.config import Config
from ref.database import Database
from ref.datasets.cmip6 import CMIP6DatasetAdapter

Expand All @@ -20,23 +19,6 @@
)


@pytest.fixture(autouse=True)
def config(tmp_path, monkeypatch) -> Config:
monkeypatch.setenv("REF_CONFIGURATION", str(tmp_path / "ref"))

# Uses the default configuration
cfg = Config.load(tmp_path / "ref" / "ref.toml")

# Allow adding datasets from outside the tree for testing
cfg.paths.allow_out_of_tree_datasets = True

# Use a SQLite in-memory database for testing
# cfg.db.database_url = "sqlite:///:memory:"
cfg.save()

return cfg


@pytest.fixture
def db(config) -> Database:
return Database.from_config(config, run_migrations=True)
Expand All @@ -50,8 +32,8 @@ def db_seeded(config, cmip6_data_catalog) -> Database:

# Seed with all the datasets in the ESGF data directory
# This includes datasets which span multiple file and until 2300
for instance_id, data_catalog_dataset in cmip6_data_catalog.groupby(adapter.slug_column):
adapter.register_dataset(config, database, data_catalog_dataset)
with database.session.begin():
for instance_id, data_catalog_dataset in cmip6_data_catalog.groupby(adapter.slug_column):
adapter.register_dataset(config, database, data_catalog_dataset)

database.session.commit()
return database
3 changes: 2 additions & 1 deletion packages/ref/src/ref/cli/solve.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ def solve(
This may trigger a number of additional calculations depending on what data has been ingested
since the last solve.
"""
solve_metrics(ctx.obj.database, dry_run=dry_run)
with ctx.obj.database.session.begin():
solve_metrics(ctx.obj.database, dry_run=dry_run)
5 changes: 4 additions & 1 deletion packages/ref/src/ref/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

from ref.models.base import Base
from ref.models.dataset import Dataset
from ref.models.metric import Metric
from ref.models.metric_execution import MetricExecution
from ref.models.provider import Provider

Table = TypeVar("Table", bound=Base)


__all__ = ["Base", "Dataset", "Table"]
__all__ = ["Base", "Dataset", "Table", "Metric", "MetricExecution", "Provider"]
21 changes: 20 additions & 1 deletion packages/ref/src/ref/models/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from sqlalchemy.orm import DeclarativeBase
import datetime

from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
Expand All @@ -7,3 +10,19 @@ class Base(DeclarativeBase):
"""

pass


class CreatedUpdatedMixin:
"""
Mixin for models that have a created_at and updated_at fields
"""

created_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
"""
When the dataset was added to the database
"""

updated_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now(), onupdate=func.now())
"""
When the dataset was updated.
"""
Loading
Loading