From 69f46272123d4c4d1ed7d7e76b5886ee598e9943 Mon Sep 17 00:00:00 2001 From: Jared Lewis Date: Mon, 9 Dec 2024 20:35:50 +1100 Subject: [PATCH 1/4] feat: Fixes from ingesting more datasets --- packages/ref/src/ref/cli/datasets.py | 22 ++++++++--- packages/ref/src/ref/datasets/__init__.py | 6 +-- packages/ref/src/ref/datasets/base.py | 46 +++++++++++++++++++---- packages/ref/src/ref/datasets/cmip6.py | 46 +++++++++++++++++++++-- 4 files changed, 100 insertions(+), 20 deletions(-) diff --git a/packages/ref/src/ref/cli/datasets.py b/packages/ref/src/ref/cli/datasets.py index eea11b0..da0ba9c 100644 --- a/packages/ref/src/ref/cli/datasets.py +++ b/packages/ref/src/ref/cli/datasets.py @@ -104,12 +104,16 @@ def list_columns( @app.command() -def ingest( +def ingest( # noqa: PLR0913 ctx: typer.Context, file_or_directory: Path, - source_type: SourceDatasetType = typer.Option(help="Type of source dataset"), - solve: bool = typer.Option(False, help="Run metrics after ingestion"), - dry_run: bool = typer.Option(False, help="Do not execute any metrics"), + source_type: Annotated[SourceDatasetType, typer.Option(help="Type of source dataset")], + solve: Annotated[bool, typer.Option(help="Solve for new metric executions after ingestion")] = False, + dry_run: Annotated[bool, typer.Option(help="Do not ingest datasets into the database")] = False, + n_jobs: Annotated[int | None, typer.Option(help="Number of jobs to run in parallel")] = None, + skip_invalid: Annotated[ + bool, typer.Option(help="Ignore (but log) any datasets that don't pass validation") + ] = False, ) -> None: """ Ingest a dataset @@ -119,9 +123,15 @@ def ingest( config = ctx.obj.config db = ctx.obj.database + file_or_directory = Path(file_or_directory).expanduser() logger.info(f"ingesting {file_or_directory}") - adapter = get_dataset_adapter(source_type.value) + kwargs = {} + + if n_jobs is not None: + kwargs["n_jobs"] = n_jobs + + adapter = get_dataset_adapter(source_type.value, **kwargs) # Create a data catalog from the specified file or directory if not file_or_directory.exists(): @@ -129,7 +139,7 @@ def ingest( raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file_or_directory) data_catalog = adapter.find_local_datasets(file_or_directory) - adapter.validate_data_catalog(data_catalog) + data_catalog = adapter.validate_data_catalog(data_catalog, skip_invalid=skip_invalid) logger.info( f"Found {len(data_catalog)} files for {len(data_catalog[adapter.slug_column].unique())} datasets" diff --git a/packages/ref/src/ref/datasets/__init__.py b/packages/ref/src/ref/datasets/__init__.py index e90fd00..bd44612 100644 --- a/packages/ref/src/ref/datasets/__init__.py +++ b/packages/ref/src/ref/datasets/__init__.py @@ -2,7 +2,7 @@ Dataset handling utilities """ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from ref_core.datasets import SourceDatasetType @@ -10,7 +10,7 @@ from ref.datasets.base import DatasetAdapter -def get_dataset_adapter(source_type: str) -> "DatasetAdapter": +def get_dataset_adapter(source_type: str, **kwargs: Any) -> "DatasetAdapter": """ Get the appropriate adapter for the specified source type @@ -27,6 +27,6 @@ def get_dataset_adapter(source_type: str) -> "DatasetAdapter": if source_type.lower() == SourceDatasetType.CMIP6.value: from ref.datasets.cmip6 import CMIP6DatasetAdapter - return CMIP6DatasetAdapter() + return CMIP6DatasetAdapter(**kwargs) else: raise ValueError(f"Unknown source type: {source_type}") diff --git a/packages/ref/src/ref/datasets/base.py b/packages/ref/src/ref/datasets/base.py index f1a1217..ad36b28 100644 --- a/packages/ref/src/ref/datasets/base.py +++ b/packages/ref/src/ref/datasets/base.py @@ -2,12 +2,32 @@ from typing import Protocol import pandas as pd +from loguru import logger from ref.config import Config from ref.database import Database from ref.models.dataset import Dataset +def _log_duplicate_metadata(data_catalog, unique_metadata): + # Drop out the rows where the values are the same + invalid_datasets = unique_metadata[unique_metadata.gt(1).any(axis=1)] + # Drop out the columns where the values are the same + invalid_datasets = invalid_datasets[invalid_datasets.columns[invalid_datasets.gt(1).any(axis=0)]] + + for instance_id in invalid_datasets.index: + # Get the columns where the values are different + invalid_dataset_nunique = invalid_datasets.loc[instance_id] + invalid_dataset_columns = invalid_dataset_nunique[invalid_dataset_nunique.gt(1)].index.tolist() + invalid_dataset_columns.append("time_range") + + data_catalog_subset = data_catalog[data_catalog.instance_id == instance_id] + + logger.error( + f"Dataset {instance_id} has varying metadata:\n{data_catalog_subset[invalid_dataset_columns]}" + ) + + class DatasetAdapter(Protocol): """ An adapter to provide a common interface for different dataset types @@ -43,7 +63,7 @@ def register_dataset( """ ... - def validate_data_catalog(self, data_catalog: pd.DataFrame) -> pd.DataFrame: + def validate_data_catalog(self, data_catalog: pd.DataFrame, skip_invalid: bool = False) -> pd.DataFrame: """ Validate a data catalog @@ -51,6 +71,13 @@ def validate_data_catalog(self, data_catalog: pd.DataFrame) -> pd.DataFrame: ---------- data_catalog Data catalog to validate + skip_invalid + If True, ignore datasets with invalid metadata and remove them from the resulting data catalog. + + Raises + ------ + ValueError + If `skip_invalid` is False (default) and the data catalog contains validation errors. Returns ------- @@ -70,13 +97,16 @@ def validate_data_catalog(self, data_catalog: pd.DataFrame) -> pd.DataFrame: data_catalog[list(self.dataset_specific_metadata)].groupby(self.slug_column).nunique() ) if unique_metadata.gt(1).any(axis=1).any(): - # Drop out the rows where the values are the same - invalid_datasets = unique_metadata[unique_metadata.gt(1).any(axis=1)] - # Drop out the columns where the values are the same - invalid_datasets = invalid_datasets[invalid_datasets.gt(1)].dropna(axis=1) - raise ValueError( - f"Dataset specific metadata varies by dataset.\nUnique values: {invalid_datasets}" - ) + _log_duplicate_metadata(data_catalog, unique_metadata) + + if skip_invalid: + data_catalog = data_catalog[ + ~data_catalog[self.slug_column].isin( + unique_metadata[unique_metadata.gt(1).any(axis=1)].index + ) + ] + else: + raise ValueError("Dataset specific metadata varies by dataset") return data_catalog diff --git a/packages/ref/src/ref/datasets/cmip6.py b/packages/ref/src/ref/datasets/cmip6.py index 4f0a89a..2ef6d3f 100644 --- a/packages/ref/src/ref/datasets/cmip6.py +++ b/packages/ref/src/ref/datasets/cmip6.py @@ -22,13 +22,47 @@ def _parse_datetime(dt_str: pd.Series[str]) -> pd.Series[datetime | Any]: """ Pandas tries to coerce everything to their own datetime format, which is not what we want here. """ + + def _inner(date_string): + if not date_string: + return None + + # Try to parse the date string with and without milliseconds + try: + dt = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S") + except ValueError: + dt = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S.%f") + + return dt + return pd.Series( - [datetime.strptime(dt, "%Y-%m-%d %H:%M:%S") if dt else None for dt in dt_str], + [_inner(dt) for dt in dt_str], index=dt_str.index, dtype="object", ) +def _apply_fixes(data_catalog: pd.DataFrame) -> pd.DataFrame: + def _fix_parent_variant_label(group: pd.DataFrame) -> pd.DataFrame: + if group["parent_variant_label"].nunique() == 1: + return group + group["parent_variant_label"] = group["variant_label"].iloc[0] + + return group + + data_catalog = data_catalog.groupby("instance_id").apply(_fix_parent_variant_label).reset_index(drop=True) + + # EC-Earth3 uses "D" as a suffix for the branch_time_in_child and branch_time_in_parent columns + data_catalog["branch_time_in_child"] = pd.to_numeric( + data_catalog["branch_time_in_child"].astype(str).str.replace("D", ""), errors="raise" + ) + data_catalog["branch_time_in_parent"] = pd.to_numeric( + data_catalog["branch_time_in_parent"].astype(str).str.replace("D", ""), errors="raise" + ) + + return data_catalog + + class CMIP6DatasetAdapter(DatasetAdapter): """ Adapter for CMIP6 datasets @@ -75,6 +109,9 @@ class CMIP6DatasetAdapter(DatasetAdapter): file_specific_metadata = ("start_time", "end_time", "path") + def __init__(self, n_jobs: int = 1): + self.n_jobs = n_jobs + def pretty_subset(self, data_catalog: pd.DataFrame) -> pd.DataFrame: """ Get a subset of the data_catalog to pretty print @@ -127,8 +164,7 @@ def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame: paths=[str(file_or_directory)], depth=10, include_patterns=["*.nc"], - # TODO: This is hardcoded to 1 because of >1 fails during unittests - joblib_parallel_kwargs={"n_jobs": 1}, + joblib_parallel_kwargs={"n_jobs": self.n_jobs}, ).build(parsing_func=ecgtools.parsers.parse_cmip6) datasets = builder.df @@ -153,6 +189,10 @@ def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame: lambda row: "CMIP6." + ".".join([row[item] for item in drs_items]), axis=1 ) + # Temporary fix for some datasets + # TODO: Replace with a standalone package that contains metadata fixes for CMIP6 datasets + datasets = _apply_fixes(datasets).reset_index() + return datasets def register_dataset( From 78127a9c90aa800efc647f587d8c0bc72e58fac4 Mon Sep 17 00:00:00 2001 From: Jared Lewis Date: Mon, 9 Dec 2024 21:05:15 +1100 Subject: [PATCH 2/4] test: Add tests for failing datetimes --- packages/ref/src/ref/datasets/base.py | 10 ++++++--- packages/ref/src/ref/datasets/cmip6.py | 2 +- .../ref/tests/unit/datasets/test_cmip6.py | 15 ++++++++++++- .../ref/tests/unit/datasets/test_datasets.py | 21 ++++++++++++------- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/packages/ref/src/ref/datasets/base.py b/packages/ref/src/ref/datasets/base.py index ad36b28..1551d06 100644 --- a/packages/ref/src/ref/datasets/base.py +++ b/packages/ref/src/ref/datasets/base.py @@ -9,7 +9,9 @@ from ref.models.dataset import Dataset -def _log_duplicate_metadata(data_catalog, unique_metadata): +def _log_duplicate_metadata( + data_catalog: pd.DataFrame, unique_metadata: pd.DataFrame, slug_column: str +) -> None: # Drop out the rows where the values are the same invalid_datasets = unique_metadata[unique_metadata.gt(1).any(axis=1)] # Drop out the columns where the values are the same @@ -19,9 +21,11 @@ def _log_duplicate_metadata(data_catalog, unique_metadata): # Get the columns where the values are different invalid_dataset_nunique = invalid_datasets.loc[instance_id] invalid_dataset_columns = invalid_dataset_nunique[invalid_dataset_nunique.gt(1)].index.tolist() + + # Include time_range in the list of invalid columns to make debugging easier invalid_dataset_columns.append("time_range") - data_catalog_subset = data_catalog[data_catalog.instance_id == instance_id] + data_catalog_subset = data_catalog[data_catalog[slug_column] == instance_id] logger.error( f"Dataset {instance_id} has varying metadata:\n{data_catalog_subset[invalid_dataset_columns]}" @@ -97,7 +101,7 @@ def validate_data_catalog(self, data_catalog: pd.DataFrame, skip_invalid: bool = data_catalog[list(self.dataset_specific_metadata)].groupby(self.slug_column).nunique() ) if unique_metadata.gt(1).any(axis=1).any(): - _log_duplicate_metadata(data_catalog, unique_metadata) + _log_duplicate_metadata(data_catalog, unique_metadata, self.slug_column) if skip_invalid: data_catalog = data_catalog[ diff --git a/packages/ref/src/ref/datasets/cmip6.py b/packages/ref/src/ref/datasets/cmip6.py index 2ef6d3f..bc138a7 100644 --- a/packages/ref/src/ref/datasets/cmip6.py +++ b/packages/ref/src/ref/datasets/cmip6.py @@ -191,7 +191,7 @@ def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame: # Temporary fix for some datasets # TODO: Replace with a standalone package that contains metadata fixes for CMIP6 datasets - datasets = _apply_fixes(datasets).reset_index() + datasets = _apply_fixes(datasets) return datasets diff --git a/packages/ref/tests/unit/datasets/test_cmip6.py b/packages/ref/tests/unit/datasets/test_cmip6.py index 0ae891c..b34fe19 100644 --- a/packages/ref/tests/unit/datasets/test_cmip6.py +++ b/packages/ref/tests/unit/datasets/test_cmip6.py @@ -1,7 +1,9 @@ +import datetime + import pandas as pd import pytest -from ref.datasets.cmip6 import CMIP6DatasetAdapter +from ref.datasets.cmip6 import CMIP6DatasetAdapter, _parse_datetime @pytest.fixture @@ -15,6 +17,17 @@ def check(df: pd.DataFrame, basename: str): return check +def test_parse_datetime(): + pd.testing.assert_series_equal( + _parse_datetime(pd.Series(["2021-01-01 00:00:00", "1850-01-17 00:29:59.999993"])), + pd.Series( + [datetime.datetime(2021, 1, 1, 0, 0), datetime.datetime(1850, 1, 17, 0, 29, 59, 999993)], + dtype="object", + ), + ) + pass + + class TestCMIP6Adapter: def test_catalog_empty(self, db): adapter = CMIP6DatasetAdapter() diff --git a/packages/ref/tests/unit/datasets/test_datasets.py b/packages/ref/tests/unit/datasets/test_datasets.py index 68a2dae..b8bd84e 100644 --- a/packages/ref/tests/unit/datasets/test_datasets.py +++ b/packages/ref/tests/unit/datasets/test_datasets.py @@ -11,7 +11,7 @@ class MockDatasetAdapter(DatasetAdapter): dataset_model: pd.DataFrame slug_column: str = "dataset_slug" - dataset_specific_metadata: tuple[str, ...] = ("metadata1", "metadata2") + dataset_specific_metadata: tuple[str, ...] = ("metadata1", "metadata2", "dataset_slug") file_specific_metadata: tuple[str, ...] = ("file_name", "file_size") def pretty_subset(self, data_catalog: pd.DataFrame) -> pd.DataFrame: @@ -24,10 +24,11 @@ def find_local_datasets(self, file_or_directory: Path) -> pd.DataFrame: "dataset_slug": [f"{file_or_directory.stem}_001", f"{file_or_directory.stem}_001"], "metadata1": ["value1", "value1"], "metadata2": ["value2", "value2"], + "time_range": ["2020-01-01", "2020-01-01"], "file_name": [file_or_directory.name, file_or_directory.name + "_2"], "file_size": [100, 100], } - return pd.DataFrame(data).set_index(self.slug_column) + return pd.DataFrame(data) def register_dataset(self, config, db, data_catalog_dataset: pd.DataFrame) -> pd.DataFrame | None: # Returning the input as a stand-in "registered" dataset @@ -60,20 +61,26 @@ def test_validate_data_catalog_missing_columns(): adapter.validate_data_catalog(data_catalog.drop(columns=["file_name"])) -def test_validate_data_catalog_metadata_variance(): +def test_validate_data_catalog_metadata_variance(caplog): adapter = MockDatasetAdapter() data_catalog = adapter.find_local_datasets(Path("path/to/dataset")) # file_name differs between datasets - adapter.dataset_specific_metadata = ("metadata1", "metadata2", "file_name") + adapter.dataset_specific_metadata = (*adapter.dataset_specific_metadata, "file_name") - exp_df = pd.DataFrame(columns=["file_name"], index=["dataset_001"], data=[2]) - exp_df.index.name = "dataset_slug" + exp_message = "Dataset dataset_001 has varying metadata:\n file_name time_range\n0 dataset 2020-01-01\n1 dataset_2 2020-01-01" # noqa: E501 with pytest.raises( ValueError, - match=f"Dataset specific metadata varies by dataset.\nUnique values: {exp_df}", + match="Dataset specific metadata varies by dataset", ): adapter.validate_data_catalog(data_catalog) + assert len(caplog.records) == 1 + assert caplog.records[0].message == exp_message + + caplog.clear() + assert len(adapter.validate_data_catalog(data_catalog, skip_invalid=True)) == 0 + assert len(caplog.records) == 1 + assert caplog.records[0].message == exp_message @pytest.mark.parametrize( From 0f9d7d4008a7ac2d803ccf61c5539fb268eab1e7 Mon Sep 17 00:00:00 2001 From: Jared Lewis Date: Mon, 9 Dec 2024 21:15:03 +1100 Subject: [PATCH 3/4] docs: Changelog --- changelog/36.feature.md | 3 +++ packages/ref/tests/unit/datasets/test_cmip6.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 changelog/36.feature.md diff --git a/changelog/36.feature.md b/changelog/36.feature.md new file mode 100644 index 0000000..045e7a2 --- /dev/null +++ b/changelog/36.feature.md @@ -0,0 +1,3 @@ +Added option to skip any datasets that fail validation and to specify the number of cores to +use when ingesting datasets. +This behaviour can be opted in using the `--skip-invalid` and `--n-jobs` options respectively. diff --git a/packages/ref/tests/unit/datasets/test_cmip6.py b/packages/ref/tests/unit/datasets/test_cmip6.py index b34fe19..5216681 100644 --- a/packages/ref/tests/unit/datasets/test_cmip6.py +++ b/packages/ref/tests/unit/datasets/test_cmip6.py @@ -25,7 +25,6 @@ def test_parse_datetime(): dtype="object", ), ) - pass class TestCMIP6Adapter: From b36e493fdfded6a317b1aad0810ca15c9340dae4 Mon Sep 17 00:00:00 2001 From: Jared Lewis Date: Mon, 9 Dec 2024 21:16:46 +1100 Subject: [PATCH 4/4] chore: Fix mypy --- packages/ref/src/ref/datasets/cmip6.py | 2 +- packages/ref/tests/unit/datasets/test_cmip6.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/ref/src/ref/datasets/cmip6.py b/packages/ref/src/ref/datasets/cmip6.py index bc138a7..1d19e8c 100644 --- a/packages/ref/src/ref/datasets/cmip6.py +++ b/packages/ref/src/ref/datasets/cmip6.py @@ -23,7 +23,7 @@ def _parse_datetime(dt_str: pd.Series[str]) -> pd.Series[datetime | Any]: Pandas tries to coerce everything to their own datetime format, which is not what we want here. """ - def _inner(date_string): + def _inner(date_string: str | None) -> datetime | None: if not date_string: return None diff --git a/packages/ref/tests/unit/datasets/test_cmip6.py b/packages/ref/tests/unit/datasets/test_cmip6.py index 5216681..5899a5d 100644 --- a/packages/ref/tests/unit/datasets/test_cmip6.py +++ b/packages/ref/tests/unit/datasets/test_cmip6.py @@ -19,9 +19,9 @@ def check(df: pd.DataFrame, basename: str): def test_parse_datetime(): pd.testing.assert_series_equal( - _parse_datetime(pd.Series(["2021-01-01 00:00:00", "1850-01-17 00:29:59.999993"])), + _parse_datetime(pd.Series(["2021-01-01 00:00:00", "1850-01-17 00:29:59.999993", None])), pd.Series( - [datetime.datetime(2021, 1, 1, 0, 0), datetime.datetime(1850, 1, 17, 0, 29, 59, 999993)], + [datetime.datetime(2021, 1, 1, 0, 0), datetime.datetime(1850, 1, 17, 0, 29, 59, 999993), None], dtype="object", ), )