From 4adb50162e1d3b72df2e86a49d662b7e77ee118e Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Tue, 22 Oct 2024 13:31:48 -0400 Subject: [PATCH 01/11] working checkin --- ilamb3/analysis/relationship.py | 2 +- ilamb3/exceptions.py | 28 +++++++ ilamb3/models/__init__.py | 3 +- ilamb3/models/base.py | 135 ++++++++++++++++++++++++++++++ ilamb3/models/esgf.py | 28 +++++-- ilamb3/workflow.py | 142 ++++++++++++++++++++++++++++++++ pyproject.toml | 5 +- 7 files changed, 330 insertions(+), 13 deletions(-) create mode 100644 ilamb3/models/base.py create mode 100644 ilamb3/workflow.py diff --git a/ilamb3/analysis/relationship.py b/ilamb3/analysis/relationship.py index c53edbf..3a22d41 100644 --- a/ilamb3/analysis/relationship.py +++ b/ilamb3/analysis/relationship.py @@ -287,7 +287,7 @@ def __call__( analysis_name, f"Score {var_dep} vs {var_ind}", "score", - "", + "1", score, ] ) diff --git a/ilamb3/exceptions.py b/ilamb3/exceptions.py index a1d1230..31e78b7 100644 --- a/ilamb3/exceptions.py +++ b/ilamb3/exceptions.py @@ -27,3 +27,31 @@ class NoDatabaseEntry(ILAMBException): class NoSiteDimension(ILAMBException): """The dataset/dataarray does not contain a clear site dimension.""" + + +class AnalysisFailure(ILAMBException): + """ + The analysis function you were running threw an exception. + + Parameters + ---------- + analysis : str + The name of the analysis which has failed. + variable : str + The name of the variable associated with this analysis. + source : str + The name of the source of the reference data. + model : str + The name of the model whose analysis failed. + """ + + def __init__( + self, analysis: str, variable: str, source: str, model: str + ): # numpydoc ignore=GL08 + self.analysis = analysis + self.variable = variable + self.source = source + self.model = model + + def __str__(self): # numpydoc ignore=GL08 + return f"The '{self.analysis}' analysis failed for '{self.variable} | {self.source}' against '{self.model}'" diff --git a/ilamb3/models/__init__.py b/ilamb3/models/__init__.py index 6b615f1..5243264 100644 --- a/ilamb3/models/__init__.py +++ b/ilamb3/models/__init__.py @@ -1,5 +1,6 @@ """Model abstractions used in ILAMB.""" +from ilamb3.models.base import Model from ilamb3.models.esgf import ModelESGF -__all__ = ["ModelESGF"] +__all__ = ["Model", "ModelESGF"] diff --git a/ilamb3/models/base.py b/ilamb3/models/base.py new file mode 100644 index 0000000..3cdd340 --- /dev/null +++ b/ilamb3/models/base.py @@ -0,0 +1,135 @@ +"""A class for abstracting the variables associated with an earth system model.""" + +import os +import re +from collections.abc import Sequence +from dataclasses import dataclass, field +from pathlib import Path +from typing import Self + +import xarray as xr +from esmodels.exceptions import ParsingError, VarNotInModel + + +def free_symbols(expression: str) -> list[str]: + """ + Return the free symbols of the expression. + + Parameters + ---------- + expression : str + The expression. + + Returns + ------- + list[str] + A list of the free symbols in the expression. + """ + return re.findall(r"\w+", expression) + + +@dataclass +class Model: + + name: str + color: tuple[float] = (0.0, 0.0, 0.0) + variables: dict = field(init=False, repr=False, default_factory=dict) + synonyms: dict = field(init=False, repr=False, default_factory=dict) + + def find_files( + self, + path: str | Sequence[str], + ) -> Self: + """ + Find netCDF files and variables in the provided path(s). + + Parameters + ---------- + path : str or a Sequence of str + The path(s) in which to search for netCDF files. + """ + if isinstance(path, str): + path = [path] + # Walk through all the paths given (following links)... + for file_path in path: + for root, _, files in os.walk(file_path, followlinks=True): + for filename in files: + filepath = Path(root) / Path(filename) + if not filepath.suffix == ".nc": + continue + # Open the dataset and for each variable, insert a key into the + # variables dict appending the path where it was found. + with xr.open_dataset(filepath) as ds: + for key in ds.variables.keys(): + if key not in self.variables: + self.variables[key] = [] + self.variables[key].append(str(filepath)) + # Sort the file lists to make opening easier later + self.variables = {var: sorted(paths) for var, paths in self.variables.items()} + return self + + def add_synonym(self, expression: str) -> Self: + """ + Associate variable synonyms or expressions with this model. + + Parameters + ---------- + expression : str + An expression for the synonym of the form, `new_var = var_in_model`. + """ + assert isinstance(expression, str) + if expression.count("=") != 1: + raise ValueError("Add a synonym by providing a string of the form 'a = b'") + key, expression = expression.split("=") + # Check that the free symbols of the expression are variables in this model + for arg in free_symbols(expression): + assert arg in self.variables + self.synonyms[key.strip()] = expression.strip() + return self + + def get_variable( + self, + name: str, + ) -> xr.Dataset: + """ + Search the model database for the specified variable. + + Parameters + ---------- + name : str + The name of the variable we wish to load. + + Returns + ------- + xr.Dataset + The xarray Dataset. + """ + + def _load(symbols: list[str]): + dsd = ( + { + sym: ( + xr.open_dataset(self.variables[sym][0]) + if len(self.variables[sym]) == 1 + else xr.open_mfdataset(self.variables[sym]) + )[sym] + for sym in symbols + }, + ) + return dsd[0] + + if name in self.variables: + return xr.Dataset(_load([name])) + if name in self.synonyms: + symbols = free_symbols(self.synonyms[name]) + ds = _load(symbols) + expr = str(self.synonyms[name]) + for sym in symbols: + expr = expr.replace(sym, f"ds['{sym}']") + try: + ds = xr.Dataset({name: eval(expr)}) + except Exception as ex: + print(ex) + raise ParsingError(self.name, name, self.synonyms[name]) + return ds + raise VarNotInModel(name, self.synonyms[name]) diff --git a/ilamb3/models/esgf.py b/ilamb3/models/esgf.py index 755e17f..148b76b 100644 --- a/ilamb3/models/esgf.py +++ b/ilamb3/models/esgf.py @@ -3,6 +3,7 @@ from intake_esgf.exceptions import NoSearchResults from ilamb3.exceptions import VarNotInModel +from ilamb3.models.base import Model def _create_cell_measures(ds: xr.Dataset, variable_id: str) -> xr.Dataset: @@ -24,22 +25,33 @@ def _create_cell_measures(ds: xr.Dataset, variable_id: str) -> xr.Dataset: return ds -class ModelESGF: - def __init__(self, source_id: str, variant_label: str, grid_label: str): +class ModelESGF(Model): + def __init__(self, source_id: str, member_id: str, grid_label: str): + self.name = source_id self.search = dict( source_id=source_id, - variant_label=variant_label, + member_id=member_id, grid_label=grid_label, - experiment_id="historical", - frequency="mon", ) def __repr__(self): - return f"{self.search['source_id']}|{self.search['variant_label']}|{self.search['grid_label']}" + return f"{self.search['source_id']}|{self.search['member_id']}|{self.search['grid_label']}" - def get_variable(self, variable_id: str, quiet: bool = True) -> xr.Dataset: + def get_variable( + self, + variable_id: str, + experiment_id: str = "historical", + frequency: str = "mon", + quiet: bool = True, + ) -> xr.Dataset: search = self.search.copy() - search.update({"variable_id": variable_id}) + search.update( + { + "experiment_id": experiment_id, + "variable_id": variable_id, + "frequency": frequency, + } + ) try: cat = ESGFCatalog().search(quiet=quiet, **search) except NoSearchResults: diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py new file mode 100644 index 0000000..ab119e8 --- /dev/null +++ b/ilamb3/workflow.py @@ -0,0 +1,142 @@ +import os +import warnings +from pathlib import Path + +import pandas as pd +import xarray as xr + +import ilamb3 +import ilamb3.analysis as anl +import ilamb3.dataset as dset +from ilamb3.exceptions import AnalysisFailure +from ilamb3.models.base import Model + +DEFAULT_ANALYSES = {"bias": anl.bias_analysis} + + +def open_reference_data(key_or_path: str | Path) -> xr.Dataset: + """ + Load the reference data. + + Parameters + ---------- + key_or_path: str or Path + The key or path to the reference data. First we check if it is a key in the + `ilamb3.ilamb_catalog()` and then if it is a path to a file. That path may be + absolute or relative to an environment variable `ILAMB_ROOT`. + + Returns + ------- + xr.Dataset + The reference data. + """ + key_or_path = Path(key_or_path) + if key_or_path.is_file(): + return xr.open_dataset(key_or_path) + root = Path(os.environ["ILAMB_ROOT"]) + if (root / key_or_path).is_file(): + return xr.open_dataset(root / key_or_path) + cat = ilamb3.ilamb_catalog() + key_or_path = str(key_or_path) + if key_or_path in cat: + return cat[key_or_path].read() + raise FileNotFoundError( + f"'{key_or_path}' is not a key in the ilamb3 catalog, nor is it a valid file as an absolute path or relative to ILAMB_ROOT={root}" + ) + + +def work_model_data(work): + + # Unpack the work + model, analysis_setup = work + + # We will catch all warnings and then put them into the log file + with warnings.catch_warnings(record=True) as _: + warnings.simplefilter("always") + + # If a specialized analysis will be required, then it should be given in the + # analysis_setup. + if "analysis" in analysis_setup: + raise NotImplementedError() + else: + df, ds_ref, ds_com = work_default_analysis(model, **analysis_setup) + + return df, ds_ref, ds_com + + +def work_default_analysis(model: Model, **analysis_setup): + + # Check on inputs + sources = analysis_setup.get("sources", {}) + relationships = analysis_setup.get("relationships", {}) + if len(sources) != 1: + raise ValueError( + f"The default ILAMB analysis requires a single variable and source, but I found: {sources}" + ) + variable = list(sources.keys())[0] + + # Setup the default analysis + analyses = { + name: a(variable) + for name, a in DEFAULT_ANALYSES.items() + if analysis_setup.get(f"skip_{name}", False) is False + } + analyses.update( + { + f"rel_{ind_variable}": anl.relationship_analysis(variable, ind_variable) + for ind_variable in relationships + } + ) + + # Get reference data + ref = {v: open_reference_data(s) for v, s in (sources | relationships).items()} + if relationships: + # Interpolate relationships to the reference grid + lat = ref[variable][dset.get_dim_name(ref[variable], "lat")] + lon = ref[variable][dset.get_dim_name(ref[variable], "lon")] + for v in relationships: + ref[v] = ref[v].interp( + { + dset.get_dim_name(ref[v], "lat"): lat, + dset.get_dim_name(ref[v], "lon"): lon, + }, + method="nearest", + ) + ref = xr.merge([v for _, v in ref.items()]) + + # Get the model data, only use the measures from the primary variables + com = [ + model.get_variable(v).drop_vars("cell_measures", errors="ignore") + for v in set([v for _, a in analyses.items() for v in a.required_variables()]) + ] + com = xr.merge(com) + + # Run the analyses + dfs = [] + ds_refs = [] + ds_coms = [] + for name, a in analyses.items(): + try: + df, ds_ref, ds_com = a(ref, com) + except Exception: + raise AnalysisFailure(name, variable, "?", model.name) + dfs.append(df) + ds_refs.append(ds_ref) + ds_coms.append(ds_com) + + # Merge results + dfs = pd.concat(dfs) + dfs["source"] = dfs["source"].str.replace("Comparison", model.name) + + return dfs, ds_refs, ds_coms + + +def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict: + items = [] + for k, v in d.items(): + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, dict) and "sources" not in v: + items.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items) diff --git a/pyproject.toml b/pyproject.toml index c7e1425..a71a4a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,6 @@ version_scheme = "no-guess-dev" local_scheme = "node-and-date" fallback_version = "0.0.0" write_to = "ilamb3/_version.py" -write_to_template = '__version__ = "{version}"' [tool.pytest.ini_options] console_output_style = "count" @@ -78,12 +77,12 @@ extend-exclude = [ "doc", ] target-version = "py39" -ignore = [ +lint.ignore = [ "E402", "E501", "E731", ] -select = [ +lint.select = [ "F", # Pyflakes "E", # Pycodestyle "W", From c86f2bc8e9b21d6785be6b036f238c052e437c46 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Mon, 11 Nov 2024 13:28:31 -0500 Subject: [PATCH 02/11] working checkin --- .pre-commit-config.yaml | 4 -- ilamb3/__init__.py | 3 +- ilamb3/analysis/bias.py | 4 ++ ilamb3/config.py | 87 +++++++++++++++++++++++++++++++++++++++++ ilamb3/workflow.py | 82 ++++++++++++++++++++++++++++++++------ 5 files changed, 164 insertions(+), 16 deletions(-) create mode 100644 ilamb3/config.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ba2029f..dc80aba 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,7 +17,3 @@ repos: rev: 24.4.2 hooks: - id: black - - repo: https://github.com/numpy/numpydoc - rev: v1.7.0 - hooks: - - id: numpydoc-validation diff --git a/ilamb3/__init__.py b/ilamb3/__init__.py index a50fab1..35722fe 100644 --- a/ilamb3/__init__.py +++ b/ilamb3/__init__.py @@ -8,6 +8,7 @@ from cf_xarray.units import units from ilamb3._version import __version__ # noqa +from ilamb3.config import conf # noqa # additional units that pint/cf-xarray does not handle units.define("kg = 1e3 * g") @@ -29,5 +30,5 @@ def ilamb_catalog() -> intake.Catalog: ) -__all__ = ["dataset", "compare", "analysis", "regions", "ilamb_catalog"] +__all__ = ["dataset", "compare", "analysis", "regions", "ilamb_catalog,", "conf"] xr.set_options(keep_attrs=True) diff --git a/ilamb3/analysis/bias.py b/ilamb3/analysis/bias.py index d2c1fb1..0a7f152 100644 --- a/ilamb3/analysis/bias.py +++ b/ilamb3/analysis/bias.py @@ -294,3 +294,7 @@ def _scalar( ) dfs.attrs = dict(method=method) return dfs, ref_out, com_out + + def plots(self, df: pd.DataFrame, ref: xr.Dataset, com: dict[str, xr.Dataset]): + + pass diff --git a/ilamb3/config.py b/ilamb3/config.py new file mode 100644 index 0000000..0c92981 --- /dev/null +++ b/ilamb3/config.py @@ -0,0 +1,87 @@ +"""Configuration for ilamb3""" + +import contextlib +import copy +from pathlib import Path + +import yaml + +defaults = {"build_dir": "./_build"} + + +class Config(dict): + """A global configuration object used in the package.""" + + def __init__(self, filename: Path | None = None, **kwargs): + self.filename = ( + Path(filename) + if filename is not None + else Path.home() / ".config/ilamb3/conf.yaml" + ) + self.filename.parent.mkdir(parents=True, exist_ok=True) + self.reload_all() + self.temp = None + super().__init__(**kwargs) + + def __repr__(self): + return yaml.dump(dict(self)) + + def reset(self): + """Return to defaults.""" + self.clear() + self.update(copy.deepcopy(defaults)) + + def save(self, filename: Path | None = None): + """Save current configuration to file as YAML.""" + filename = filename or self.filename + filename.parent.mkdir(parents=True, exist_ok=True) + with open(filename, "w") as f: + yaml.dump(dict(self), f) + + @contextlib.contextmanager + def _unset(self, temp): + yield + self.clear() + self.update(temp) + + def set( + self, + *, + build_dir: str | None = None, + ): + """Change ilamb3 configuration options.""" + temp = copy.deepcopy(self) + if build_dir is not None: + self["build_dir"] = str(build_dir) + return self._unset(temp) + + def __getitem__(self, item): + if item in self: + return super().__getitem__(item) + elif item in defaults: + return defaults[item] + else: + raise KeyError(item) + + def get(self, key, default=None): + if key in self: + return super().__getitem__(key) + return default + + def reload_all(self): + self.reset() + self.load() + + def load(self, filename: Path | None = None): + """Update global config from YAML file or default file if None.""" + filename = filename or self.filename + if filename.is_file(): + with open(filename) as f: + try: + self.update(yaml.safe_load(f)) + except Exception: + pass + + +conf = Config() +conf.reload_all() diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py index ab119e8..79c4f28 100644 --- a/ilamb3/workflow.py +++ b/ilamb3/workflow.py @@ -1,6 +1,9 @@ +import logging import os +import time import warnings from pathlib import Path +from traceback import format_exc import pandas as pd import xarray as xr @@ -33,9 +36,10 @@ def open_reference_data(key_or_path: str | Path) -> xr.Dataset: key_or_path = Path(key_or_path) if key_or_path.is_file(): return xr.open_dataset(key_or_path) - root = Path(os.environ["ILAMB_ROOT"]) - if (root / key_or_path).is_file(): - return xr.open_dataset(root / key_or_path) + if "ILAMB_ROOT" in os.environ: + root = Path(os.environ["ILAMB_ROOT"]) + if (root / key_or_path).is_file(): + return xr.open_dataset(root / key_or_path) cat = ilamb3.ilamb_catalog() key_or_path = str(key_or_path) if key_or_path in cat: @@ -45,21 +49,77 @@ def open_reference_data(key_or_path: str | Path) -> xr.Dataset: ) +def _warning_handler(message, category, filename, lineno, file=None, line=None): + logger = logging.getLogger(str(filename)) + logger.setLevel(logging.WARNING) + file_handler = logging.FileHandler(filename) + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + logger.warning(message) + + +def _get_logger(logfile: str) -> logging.Logger: + + # Where will the log be written? + logfile = Path(logfile).expanduser() + logfile.parent.mkdir(parents=True, exist_ok=True) + if logfile.exists(): + logfile.unlink() + logfile.touch() + + # We need a named logger to avoid other packages that use the root logger + logger = logging.getLogger(str(logfile)) + if not logger.handlers: + # Now setup the file into which we log relevant information + file_handler = logging.FileHandler(logfile) + file_handler.setFormatter( + logging.Formatter( + "[\x1b[36;20m%(asctime)s\033[0m][\x1b[36;20m%(levelname)s\033[0m] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + ) + file_handler.setLevel(logging.INFO) + logger.addHandler(file_handler) + logger.setLevel(logging.INFO) + + # This is probably wrong, but when I log from my logger it logs from parent also + logger.parent.handlers = [] + return logger + + def work_model_data(work): # Unpack the work model, analysis_setup = work - # We will catch all warnings and then put them into the log file - with warnings.catch_warnings(record=True) as _: - warnings.simplefilter("always") + # Setup logging + logfile = ( + Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) / f"{model.name}.log" + ) + logger = _get_logger(logfile) - # If a specialized analysis will be required, then it should be given in the + logger.info("Beginning analysis") + with warnings.catch_warnings(record=True) as recorded_warnings: + # If a specialized analysis will be required, then it should be specified in the # analysis_setup. - if "analysis" in analysis_setup: - raise NotImplementedError() - else: - df, ds_ref, ds_com = work_default_analysis(model, **analysis_setup) + try: + if "analysis" in analysis_setup: + raise NotImplementedError() + else: + analysis_time = time.time() + df, ds_ref, ds_com = work_default_analysis(model, **analysis_setup) + analysis_time = time.time() - analysis_time + logger.info(f"Analysis completed in {analysis_time:.1f} [s]") + except Exception: + logger.error(format_exc()) + df = pd.DataFrame() + ds_ref = xr.Dataset() + ds_com = xr.Dataset() + + # now dump the warnings + for w in recorded_warnings: + logger.warning(str(w.message)) return df, ds_ref, ds_com From 18a7a2e770d6346f55e6a67eac252890c4ba5a65 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Tue, 26 Nov 2024 12:40:37 -0500 Subject: [PATCH 03/11] working checkin, big step towards a functional ilamb --- ilamb3/analysis/bias.py | 26 ++- ilamb3/config.py | 4 +- ilamb3/dataset.py | 22 ++ ilamb3/plot.py | 142 +++++++++++++ ilamb3/regions.py | 129 +++++------- ilamb3/templates/dataset_page.html | 308 +++++++++++++++++++++++++++ ilamb3/workflow.py | 321 ++++++++++++++++++++++++++--- pyproject.toml | 3 + 8 files changed, 854 insertions(+), 101 deletions(-) create mode 100644 ilamb3/plot.py create mode 100644 ilamb3/templates/dataset_page.html diff --git a/ilamb3/analysis/bias.py b/ilamb3/analysis/bias.py index 0a7f152..a86b66f 100644 --- a/ilamb3/analysis/bias.py +++ b/ilamb3/analysis/bias.py @@ -13,6 +13,7 @@ import pandas as pd import xarray as xr +import ilamb3.plot as plt from ilamb3 import compare as cmp from ilamb3 import dataset as dset from ilamb3.analysis.base import ILAMBAnalysis @@ -295,6 +296,27 @@ def _scalar( dfs.attrs = dict(method=method) return dfs, ref_out, com_out - def plots(self, df: pd.DataFrame, ref: xr.Dataset, com: dict[str, xr.Dataset]): + def plots( + self, + df: pd.DataFrame, + ref: xr.Dataset, + com: dict[str, xr.Dataset], + regions: list[str], + ): + + # plot_name, region, source + com["Reference"] = ref + df = plt.determine_plot_limits(com).set_index("name") + print(df) + + axs = { + "mean": { + region: { + source: plt.plot_map(ds["mean"], region=region) + for source, ds in com.items() + } + for region in regions + }, + } - pass + return axs diff --git a/ilamb3/config.py b/ilamb3/config.py index 0c92981..f4fd680 100644 --- a/ilamb3/config.py +++ b/ilamb3/config.py @@ -6,7 +6,9 @@ import yaml -defaults = {"build_dir": "./_build"} +defaults = { + "build_dir": "./_build", +} class Config(dict): diff --git a/ilamb3/dataset.py b/ilamb3/dataset.py index f94523a..c484412 100644 --- a/ilamb3/dataset.py +++ b/ilamb3/dataset.py @@ -105,6 +105,28 @@ def get_coord_name( return str(coord_name.pop()) +def is_temporal(da: xr.DataArray) -> bool: + """ + Return if the dataarray is temporal. + + Parameters + ---------- + da : xr.DataArray + The input dataarray. + + Returns + ------- + bool + True if time dimension is present, False otherwise. + """ + try: + get_dim_name(da, "time") + return True + except KeyError: + pass + return False + + def is_spatial(da: xr.DataArray) -> bool: """ Return if the dataarray is spatial. diff --git a/ilamb3/plot.py b/ilamb3/plot.py new file mode 100644 index 0000000..4fdd7be --- /dev/null +++ b/ilamb3/plot.py @@ -0,0 +1,142 @@ +import cartopy.crs as ccrs +import cartopy.feature as cfeature +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import xarray as xr + +import ilamb3.dataset as dset +from ilamb3.regions import Regions + + +def get_extents(da: xr.DataArray) -> list[float]: + """Find the extent of the non-null data.""" + lat = xr.where(da.notnull(), da[dset.get_coord_name(da, "lat")], np.nan) + lon = xr.where(da.notnull(), da[dset.get_coord_name(da, "lon")], np.nan) + return [float(lon.min()), float(lon.max()), float(lat.min()), float(lat.max())] + + +def pick_projection(extents: list[float]) -> tuple[ccrs.Projection, float]: + """Given plot extents choose projection and aspect ratio.""" + if (extents[1] - extents[0]) > 300: + aspect_ratio = 2.0 + proj = ccrs.Robinson(central_longitude=0) + if (extents[2] > 0) and (extents[3] > 75): + aspect_ratio = 1.0 + proj = ccrs.Orthographic(central_latitude=+90, central_longitude=0) + if (extents[2] < -75) and (extents[3] < 0): + aspect_ratio = 1.0 + proj = ccrs.Orthographic(central_latitude=-90, central_longitude=0) + else: + aspect_ratio = max(extents[1], extents[0]) - min(extents[1], extents[0]) + aspect_ratio /= max(extents[3], extents[2]) - min(extents[3], extents[2]) + proj = ccrs.PlateCarree(central_longitude=extents[:2].mean()) + return proj, aspect_ratio + + +def finalize_plot(ax: plt.Axes, extents: list[float]) -> plt.Axes: + """Add some final features to our plots.""" + ax.set_title("") # We don't want xarray's title + ax.add_feature( + cfeature.NaturalEarthFeature( + "physical", "land", "110m", edgecolor="face", facecolor="0.875" + ), + zorder=-1, + ) + ax.add_feature( + cfeature.NaturalEarthFeature( + "physical", "ocean", "110m", edgecolor="face", facecolor="0.750" + ), + zorder=-1, + ) + # cleanup plotting extents + percent_pad = 0.1 + if (extents[1] - extents[0]) > 330: + extents[:2] = [-180, 180] # set_extent doesn't like (0,360) + extents[2:] = [-90, 90] + else: + dx = percent_pad * (extents[1] - extents[0]) + dy = percent_pad * (extents[3] - extents[2]) + extents = [extents[0] - dx, extents[1] + dx, extents[2] - dy, extents[3] + dy] + ax.set_extent(extents, ccrs.PlateCarree()) + return ax + + +def plot_map(da: xr.DataArray, **kwargs): + + # Process some options + kwargs["cmap"] = plt.get_cmap(kwargs["cmap"] if "cmap" in kwargs else "viridis", 9) + + # Process region if given + ilamb_regions = Regions() + region = kwargs.pop("region") if "region" in kwargs else None + da = ilamb_regions.restrict_to_region(da, region) + + # Setup figure and its projection + extents = get_extents(da) + proj, aspect = pick_projection(extents) + figsize = kwargs.pop("figsize") if "figsize" in kwargs else (6 * 1.03, 6 / aspect) + _, ax = plt.subplots( + dpi=200, + tight_layout=(kwargs.pop("tight_layout") if "tight_layout" in kwargs else True), + figsize=figsize, + subplot_kw={"projection": proj}, + ) + + # Setup colorbar arguments + cba = {"label": da.attrs["units"]} + if "cbar_kwargs" in kwargs: + cba.update(kwargs.pop("cbar_kwargs")) + + # We can't make temporal maps + if dset.is_temporal(da): + raise ValueError("Cannot make spatio-temporal plots") + + # Space or sites? + if dset.is_spatial(da): + da.plot(ax=ax, transform=ccrs.PlateCarree(), cbar_kwargs=cba, **kwargs) + elif dset.is_site(da): + xr.plot.scatter( + da.to_dataset(), + x=dset.get_coord_name(da, "lon"), + y=dset.get_coord_name(da, "lat"), + hue=da.name, + lw=0, + s=15, + cbar_kwargs=cba, + transform=ccrs.PlateCarree(), + **kwargs, + ) + else: + raise ValueError("plotting error") + + ax = finalize_plot(ax, extents) + return ax + + +def determine_plot_limits( + dsd: xr.Dataset | dict[str, xr.Dataset], percent_pad: float = 1.0 +) -> pd.DataFrame: + """Return a dataframe with the plot minima and maxima.""" + if isinstance(dsd, xr.Dataset): + dsd = {"key": xr.Dataset} + plots = set([var for _, ds in dsd.items() for var in ds.data_vars]) + out = [] + for plot in plots: + # Score maps are always [0,1] + if "score" in plot: + out.append({"name": plot, "low": 0, "high": 1}) + continue + # Otherwise pick a quantile across all data sources + data = np.quantile( + np.hstack( + [ + np.ma.masked_invalid(ds[plot].to_numpy()).compressed() + for _, ds in dsd.items() + if plot in ds + ] + ), + [percent_pad * 0.01, 1 - percent_pad * 0.01], + ) + out.append({"name": plot, "low": data[0], "high": data[1]}) + return pd.DataFrame(out) diff --git a/ilamb3/regions.py b/ilamb3/regions.py index b11d4f5..0f55e7e 100644 --- a/ilamb3/regions.py +++ b/ilamb3/regions.py @@ -1,74 +1,64 @@ """Region definitions for use in the ILAMB system.""" import os -from typing import Literal, Union +from typing import Union import numpy as np import xarray as xr - -def get_dim_name( - dset: Union[xr.Dataset, xr.DataArray], dim: Literal["time", "lat", "lon"] -) -> str: - """Return the name of the `dim` dimension from the dataset. - - Parameters - ---------- - dset - The input dataset/dataarray. - dim - The dimension to find in the dataset/dataarray - - Note - ---- - Duplicate of function in ilamb3.database to avoid circular import. - - """ - dim_names = { - "time": ["time"], - "lat": ["lat", "latitude", "Latitude", "y"], - "lon": ["lon", "longitude", "Longitude", "x"], - "depth": ["depth"], - } - possible_names = dim_names[dim] - dim_name = set(dset.dims).intersection(possible_names) - if len(dim_name) != 1: - msg = f"{dim} dimension not found: {dset.dims}] " - msg += f"not in [{','.join(possible_names)}]" - raise KeyError(msg) - return str(dim_name.pop()) +import ilamb3.dataset as dset def restrict_to_bbox( - var: Union[xr.Dataset, xr.DataArray], - lat_name: str, - lon_name: str, - lat0: float, - latf: float, - lon0: float, - lonf: float, + da: xr.DataArray, lat0: float, latf: float, lon0: float, lonf: float ): - """Return the dataset selected to the nearest bounding box. + """Return the dataarray selected to the nearest bounding box. This is awkward because as of `v2023.6.0`, the `method` keyword cannot be used in slices. Note that this routine will sort the dimensions because slicing does not work well on unsorted indices. - """ - var = var.sortby(list(var.dims)) - var = var.sel( + assert isinstance(da, xr.DataArray) + lat_name = dset.get_dim_name(da, "lat") + lon_name = dset.get_dim_name(da, "lon") + da = da.sortby(list(da.dims)) + da = da.sel( { lat_name: slice( - var[lat_name].sel({lat_name: lat0}, method="nearest"), - var[lat_name].sel({lat_name: latf}, method="nearest"), + da[lat_name].sel({lat_name: lat0}, method="nearest"), + da[lat_name].sel({lat_name: latf}, method="nearest"), ), lon_name: slice( - var[lon_name].sel({lon_name: lon0}, method="nearest"), - var[lon_name].sel({lon_name: lonf}, method="nearest"), + da[lon_name].sel({lon_name: lon0}, method="nearest"), + da[lon_name].sel({lon_name: lonf}, method="nearest"), ), } ) - return var + return da + + +def restrict_to_region(da: xr.DataArray, dar: xr.DataArray): + """.""" + assert isinstance(da, xr.DataArray) + lat_name = dset.get_dim_name(da, "lat") + lon_name = dset.get_dim_name(da, "lon") + rlat_name = dset.get_dim_name(dar, "lat") + rlon_name = dset.get_dim_name(dar, "lon") + dar = dar.rename({rlat_name: lat_name, rlon_name: lon_name}) + return restrict_to_bbox( + xr.where( + dar.interp( + {lat_name: da[lat_name], lon_name: da[lon_name]}, + method="nearest", + ), + da, + np.nan, + ), + dar[lat_name].min(), + dar[lat_name].max(), + dar[lon_name].min(), + dar[lon_name].max(), + ) class Regions: @@ -176,41 +166,32 @@ def restrict_to_region( var: Union[xr.Dataset, xr.DataArray], label: Union[str, None], ) -> Union[xr.Dataset, xr.DataArray]: - """Given the region label and a variable, return a mask.""" + """Given the region label and a variable, return a mask. + + ILAMB intermediate netCDF files can contain 2 grids defined by (lat,lon) and + (lat_,lon_, the composed grid). For this reason, we need to restrict to regions + one dataarray at a time. + """ if label is None: return var rdata = Regions._regions[label] rtype = rdata[0] - lat_name = get_dim_name(var, "lat") - lon_name = get_dim_name(var, "lon") if rtype == 0: _, _, rlat, rlon = rdata - out = restrict_to_bbox( - var, lat_name, lon_name, rlat[0], rlat[1], rlon[0], rlon[1] - ) + if isinstance(var, xr.DataArray): + out = restrict_to_bbox(var, rlat[0], rlat[1], rlon[0], rlon[1]) + else: + out = { + key: restrict_to_bbox(var[key], rlat[0], rlat[1], rlon[0], rlon[1]) + for key in var + } return out if rtype == 1: _, _, dar = rdata - rlat_name = get_dim_name(dar, "lat") - rlon_name = get_dim_name(dar, "lon") - dar = dar.rename({rlat_name: lat_name, rlon_name: lon_name}) - out = xr.where( - dar.interp( - {lat_name: var[lat_name], lon_name: var[lon_name]}, - method="nearest", - ), - var, - np.nan, - ) - out = restrict_to_bbox( - out, - lat_name, - lon_name, - dar[lat_name].min(), - dar[lat_name].max(), - dar[lon_name].min(), - dar[lon_name].max(), - ) + if isinstance(var, xr.DataArray): + out = restrict_to_region(var, dar) + else: + out = {key: restrict_to_region(var[key], dar) for key in var} return out raise ValueError(f"Region type #{rtype} not recognized") diff --git a/ilamb3/templates/dataset_page.html b/ilamb3/templates/dataset_page.html new file mode 100644 index 0000000..710ccd5 --- /dev/null +++ b/ilamb3/templates/dataset_page.html @@ -0,0 +1,308 @@ + + + + + +{{ page_header }} + + + + + + + + + +
+
+ + +
+ + +
+

Scalar Table

+
+
+ + +{% for name in analyses %} +
+

{{name}}

+
+{% endfor %} + + +
+

All Models

+
+ +
+ +
+
+ + diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py index 79c4f28..b5e9817 100644 --- a/ilamb3/workflow.py +++ b/ilamb3/workflow.py @@ -1,18 +1,29 @@ +import glob +import itertools import logging import os +import re import time import warnings +from inspect import isclass from pathlib import Path from traceback import format_exc import pandas as pd import xarray as xr +import yaml +from jinja2 import Template +from mpi4py.futures import MPIPoolExecutor +from tqdm import tqdm import ilamb3 import ilamb3.analysis as anl import ilamb3.dataset as dset +import ilamb3.models as ilamb_models +from ilamb3.analysis.base import ILAMBAnalysis from ilamb3.exceptions import AnalysisFailure from ilamb3.models.base import Model +from ilamb3.regions import Regions DEFAULT_ANALYSES = {"bias": anl.bias_analysis} @@ -49,22 +60,11 @@ def open_reference_data(key_or_path: str | Path) -> xr.Dataset: ) -def _warning_handler(message, category, filename, lineno, file=None, line=None): - logger = logging.getLogger(str(filename)) - logger.setLevel(logging.WARNING) - file_handler = logging.FileHandler(filename) - formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) - logger.warning(message) - - -def _get_logger(logfile: str) -> logging.Logger: +def _get_logger(logfile: str, reset: bool = False) -> logging.Logger: # Where will the log be written? logfile = Path(logfile).expanduser() - logfile.parent.mkdir(parents=True, exist_ok=True) - if logfile.exists(): + if reset and logfile.exists(): logfile.unlink() logfile.touch() @@ -94,12 +94,11 @@ def work_model_data(work): model, analysis_setup = work # Setup logging - logfile = ( - Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) / f"{model.name}.log" - ) - logger = _get_logger(logfile) + root = Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) + logfile = root / f"{model.name}.log" + logger = _get_logger(logfile, reset=True) - logger.info("Beginning analysis") + logger.info(f"Begin {analysis_setup['path']} {model.name}") with warnings.catch_warnings(record=True) as recorded_warnings: # If a specialized analysis will be required, then it should be specified in the # analysis_setup. @@ -109,19 +108,134 @@ def work_model_data(work): else: analysis_time = time.time() df, ds_ref, ds_com = work_default_analysis(model, **analysis_setup) + + # serialize, the reference is tricky as many threads may be trying + df.to_csv(root / f"{model.name}.csv", index=False) + ds_com.to_netcdf(root / f"{model.name}.nc") + ref_file = root / "Reference.nc" + if not ref_file.is_file(): + try: + ds_ref.to_netcdf(ref_file) + logger.info(f"Saved reference file: {ref_file}") + except Exception: + logger.info("Did not save reference file.") + analysis_time = time.time() - analysis_time logger.info(f"Analysis completed in {analysis_time:.1f} [s]") except Exception: logger.error(format_exc()) - df = pd.DataFrame() - ds_ref = xr.Dataset() - ds_com = xr.Dataset() - # now dump the warnings + # Dump the warnings to the logfile + for w in recorded_warnings: + logger.warning(str(w.message)) + + return + + +def load_local_assets(root: str | Path) -> tuple[pd.DataFrame, dict[str, xr.Dataset]]: + root = Path(root) + df = pd.concat( + [ + pd.read_csv(f, keep_default_na=False, na_values=["NaN"]) + for f in glob.glob(str(root / "*.csv")) + ] + ).drop_duplicates(subset=["source", "region", "analysis", "name"]) + df["name"] = df["name"] + " [" + df["units"] + "]" + dsd = { + Path(key).stem: xr.open_dataset(key) for key in glob.glob(str(root / "*.nc")) + } + # consistency checks + assert set(df["source"]) == set(dsd.keys()) + return df, dsd + + +def post_model_data(analysis_setup): + """ + - combine csv files, remove references + - read in netcdf files + - initialize analysis + - call post + - render html + """ + # Setup logging + root = Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) + logfile = root / "post.log" + logger = _get_logger(logfile, reset=True) + + analyses = setup_analyses(**analysis_setup) + ilamb_regions = Regions() + logger.info(f"Begin post {analysis_setup['path']}") + with warnings.catch_warnings(record=True) as recorded_warnings: + post_time = time.time() + + # Load what we have in the local directory + df, com = load_local_assets(root) + ref = com.pop("Reference") if "Reference" in com else xr.Dataset() + + # Make plots + for _, analysis in analyses.items(): + if "plots" in dir(analysis): + analysis.plots(df, ref, com, df["region"].unique()) + + # Write out html + df = df.reset_index(drop=True) + df["id"] = df.index + data = { + "page_header": "gpp | FLUXCOM | 1980-2013", + "model_names": [m for m in df["source"].unique() if m != "Reference"], + "regions": { + (None if key == "None" else key): ( + "All Data" if key == "None" else ilamb_regions.get_name(key) + ) + for key in df["region"].unique() + }, + "analyses": list(df["analysis"].unique()), + "data_information": { + "Title": "FLUXCOM (RS+METEO) Global Land Carbon Fluxes using CRUNCEP climate data", + "Institutions": "Department Biogeochemical Integration, Max Planck Institute for Biogeochemistry, Germany", + "Version": "1", + }, + "table_data": str( + [row.to_dict() for _, row in df.drop(columns="units").iterrows()] + ), + } + template = open(Path(ilamb3.__path__) / "templates/dataset_page.html").read() + open("tmp.html", mode="w").write(Template(template).render(data)) + + post_time = time.time() - post_time + logger.info(f"Post-processing completed in {post_time:.1f} [s]") + + # Dump the warnings to the logfile for w in recorded_warnings: logger.warning(str(w.message)) - return df, ds_ref, ds_com + return + + +def setup_analyses(**analysis_setup) -> dict[str, ILAMBAnalysis]: + + # Check on inputs + sources = analysis_setup.get("sources", {}) + relationships = analysis_setup.get("relationships", {}) + if len(sources) != 1: + raise ValueError( + f"The default ILAMB analysis requires a single variable and source, but I found: {sources}" + ) + variable = list(sources.keys())[0] + + # Setup the default analysis + analyses = { + name: a(variable) + for name, a in DEFAULT_ANALYSES.items() + if analysis_setup.get(f"skip_{name}", False) is False + } + analyses.update( + { + f"rel_{ind_variable}": anl.relationship_analysis(variable, ind_variable) + for ind_variable in relationships + } + ) + return analyses def work_default_analysis(model: Model, **analysis_setup): @@ -187,8 +301,10 @@ def work_default_analysis(model: Model, **analysis_setup): # Merge results dfs = pd.concat(dfs) dfs["source"] = dfs["source"].str.replace("Comparison", model.name) + ds_ref = xr.merge(ds_refs).pint.dequantify() + ds_com = xr.merge(ds_coms).pint.dequantify() - return dfs, ds_refs, ds_coms + return dfs, ds_ref, ds_com def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict: @@ -200,3 +316,160 @@ def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict: else: items.append((new_key, v)) return dict(items) + + +def parse_model_setup(yaml_file: str | Path) -> list[ilamb_models.Model]: + """Parse the model setup file.""" + # parse yaml file + yaml_file = Path(yaml_file) + with open(yaml_file) as fin: + setups = yaml.safe_load(fin) + assert isinstance(setups, dict) + + # does this bit belong elsewhere? + abstract_models = { + name: model + for name, model in ilamb_models.__dict__.items() + if (isclass(model) and issubclass(model, ilamb_models.Model)) + } + + # setup models, needs error checking + models = [] + for name, setup in tqdm( + setups.items(), + bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt}{postfix}]", + desc="Initializing Models", + unit="model", + ): + if "type" in setup: + model_type = setup.pop("type") + mod = abstract_models[model_type](**setup) + models.append(mod) + else: + paths = setup.pop("paths") + mod = abstract_models["Model"](name=name, **setup).find_files(paths) + models.append(mod) + return models + + +def parse_benchmark_setup(yaml_file: str | Path) -> dict: + """Parse the file which is analagous to the old configure file.""" + yaml_file = Path(yaml_file) + with open(yaml_file) as fin: + analyses = yaml.safe_load(fin) + assert isinstance(analyses, dict) + return analyses + + +def _clean_pathname(filename: str) -> str: + """Removes characters we do not want in our paths.""" + invalid_chars = r'[\\/:*?"<>|\s]' + cleaned_filename = re.sub(invalid_chars, "", filename) + return cleaned_filename + + +def _is_leaf(current: dict) -> bool: + """Is the current item in the nested dictionary a leaf?""" + if not isinstance(current, dict): + return False + if "sources" in current: + return True + return False + + +def _add_path(current: dict, path: Path | None = None) -> dict: + """Recursively add the nested dictionary headings as a `path` in the leaves.""" + path = Path() if path is None else path + for key, val in current.items(): + if not isinstance(val, dict): + continue + key_path = path / Path(_clean_pathname(key)) + if _is_leaf(val): + val["path"] = str(key_path) + else: + current[key] = _add_path(val, key_path) + return current + + +def _to_leaf_list(current: dict, leaf_list: list | None = None) -> list: + """Recursively flatten the nested dictionary only returning the leaves.""" + leaf_list = [] if leaf_list is None else leaf_list + for _, val in current.items(): + if not isinstance(val, dict): + continue + if _is_leaf(val): + leaf_list.append(val) + else: + _to_leaf_list(val, leaf_list) + return leaf_list + + +def _create_paths(current: dict, root: Path = Path("_build")): + """Recursively ensure paths in the leaves are created.""" + for _, val in current.items(): + if not isinstance(val, dict): + continue + if _is_leaf(val): + if "path" in val: + (root / Path(val["path"])).mkdir(parents=True, exist_ok=True) + else: + _create_paths(val, root) + + +def _is_complete(work, root) -> bool: + """Have we already performed this work?""" + model, setup = work + scalar_file = Path(root) / setup["path"] / f"{model.name}.csv" + if scalar_file.is_file(): + return True + return False + + +def run_study(study_setup: str, model_setup: str): + + # Define the models + models = parse_model_setup(model_setup) + + # Some yaml text that would get parsed like a dictionary. + analyses = parse_benchmark_setup(study_setup) + + # The yaml analysis setup can be as structured as the user needs. We are no longer + # limited to the `h1` and `h2` headers from ILAMB 2.x. We will detect leaf nodes by + # the presence of a `sources` dictionary. + analyses = _add_path(analyses) + + # Various traversal actions + _create_paths(analyses) + + # Create a list of just the leaves to use in creation all work combinations + analyses_list = _to_leaf_list(analyses) + + # Create a work list but remove things that are already 'done' + work_list = [ + w + for w in itertools.product(models, analyses_list) + if not _is_complete(w, ilamb3.conf["build_dir"]) + ] + + # Phase I + if work_list: + with MPIPoolExecutor() as executor: + results = tqdm( + executor.map(work_model_data, work_list), + bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s}{postfix}]", + desc="Analysis of Model-Data Pairs", + unit="pair", + total=len(work_list), + ) + results = list(results) + + # Phase 2: plotting + with MPIPoolExecutor() as executor: + results = tqdm( + executor.map(post_model_data, analyses_list), + bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s}{postfix}]", + desc="Post-processing", + unit="analysis", + total=len(analyses_list), + ) + results = list(results) diff --git a/pyproject.toml b/pyproject.toml index a71a4a5..a16c700 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,9 @@ local_scheme = "node-and-date" fallback_version = "0.0.0" write_to = "ilamb3/_version.py" +[tool.setuptools.package-data] +ilamb3 = ["templates/*.html"] + [tool.pytest.ini_options] console_output_style = "count" addopts = "--cov=ilamb3 --cov-report=xml --verbose" From ec7474d4b855bf9d8349fc9027a89e486052d55c Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Tue, 26 Nov 2024 16:33:26 -0500 Subject: [PATCH 04/11] functional workflow, html pages not yet complete --- ilamb3/analysis/bias.py | 55 +++++++++++++++++++++++++++++------------ ilamb3/dataset.py | 4 +-- ilamb3/plot.py | 9 ++++++- ilamb3/workflow.py | 30 +++++++++++++++++----- 4 files changed, 73 insertions(+), 25 deletions(-) diff --git a/ilamb3/analysis/bias.py b/ilamb3/analysis/bias.py index a86b66f..58a3cb9 100644 --- a/ilamb3/analysis/bias.py +++ b/ilamb3/analysis/bias.py @@ -29,6 +29,8 @@ class bias_analysis(ILAMBAnalysis): ---------- required_variable : str The name of the variable to be used in this analysis. + variable_cmap : str + The colormap to use in plots of the comparison variable, optional. Methods ------- @@ -38,8 +40,11 @@ class bias_analysis(ILAMBAnalysis): The method """ - def __init__(self, required_variable: str): # numpydoc ignore=GL08 + def __init__( + self, required_variable: str, variable_cmap: str = "viridis" + ): # numpydoc ignore=GL08 self.req_variable = required_variable + self.cmap = variable_cmap def required_variables(self) -> list[str]: """ @@ -196,7 +201,7 @@ def __call__( if use_uncertainty: ref_out["uncert"] = uncert com_out = bias.to_dataset(name="bias") - com_out["bias_score"] = score + com_out["biasscore"] = score try: lat_name = dset.get_dim_name(com_mean, "lat") lon_name = dset.get_dim_name(com_mean, "lon") @@ -262,7 +267,7 @@ def _scalar( ] ) # Bias Score - bias_scalar_score = _scalar(com_out, "bias_score", region, True, True) + bias_scalar_score = _scalar(com_out, "biasscore", region, True, True) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", "divide by zero encountered in divide", RuntimeWarning @@ -301,22 +306,40 @@ def plots( df: pd.DataFrame, ref: xr.Dataset, com: dict[str, xr.Dataset], - regions: list[str], - ): + ) -> pd.DataFrame: - # plot_name, region, source + # Some initialization + regions = [None if r == "None" else r for r in df["region"].unique()] com["Reference"] = ref + + # Setup plot data df = plt.determine_plot_limits(com).set_index("name") - print(df) + df.loc["mean", "cmap"] = self.cmap + df.loc["bias", "cmap"] = "seismic" + df.loc["biasscore", "cmap"] = "plasma" - axs = { - "mean": { - region: { - source: plt.plot_map(ds["mean"], region=region) - for source, ds in com.items() - } - for region in regions - }, - } + # Build up a dataframe of matplotlib axes + axs = [ + { + "name": plot, + "region": region, + "source": source, + "axis": ( + plt.plot_map( + ds[plot], + region=region, + vmin=df.loc[plot, "low"], + vmax=df.loc[plot, "high"], + cmap=df.loc[plot, "cmap"], + ) + if plot in ds + else pd.NA + ), + } + for plot in df.index + for source, ds in com.items() + for region in regions + ] + axs = pd.DataFrame(axs).dropna(subset=["axis"]) return axs diff --git a/ilamb3/dataset.py b/ilamb3/dataset.py index c484412..e2d97c0 100644 --- a/ilamb3/dataset.py +++ b/ilamb3/dataset.py @@ -93,8 +93,8 @@ def get_coord_name( get_dim_name : A variant when the coordinate is a dimension. """ coord_names = { - "lat": ["lat", "latitude", "Latitude", "y"], - "lon": ["lon", "longitude", "Longitude", "x"], + "lat": ["lat", "latitude", "Latitude", "y", "lat_"], + "lon": ["lon", "longitude", "Longitude", "x", "lon_"], } possible_names = coord_names[coord] coord_name = set(dset.coords).intersection(possible_names) diff --git a/ilamb3/plot.py b/ilamb3/plot.py index 4fdd7be..26ec9b0 100644 --- a/ilamb3/plot.py +++ b/ilamb3/plot.py @@ -115,7 +115,9 @@ def plot_map(da: xr.DataArray, **kwargs): def determine_plot_limits( - dsd: xr.Dataset | dict[str, xr.Dataset], percent_pad: float = 1.0 + dsd: xr.Dataset | dict[str, xr.Dataset], + percent_pad: float = 1.0, + symmetrize: list[str] = ["bias"], ) -> pd.DataFrame: """Return a dataframe with the plot minima and maxima.""" if isinstance(dsd, xr.Dataset): @@ -138,5 +140,10 @@ def determine_plot_limits( ), [percent_pad * 0.01, 1 - percent_pad * 0.01], ) + # symmetrize + if plot in symmetrize: + vmax = max(np.abs(data)) + data[0] = -vmax + data[1] = vmax out.append({"name": plot, "low": data[0], "high": data[1]}) return pd.DataFrame(out) diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py index b5e9817..1948e0f 100644 --- a/ilamb3/workflow.py +++ b/ilamb3/workflow.py @@ -172,10 +172,18 @@ def post_model_data(analysis_setup): df, com = load_local_assets(root) ref = com.pop("Reference") if "Reference" in com else xr.Dataset() - # Make plots + # Make plots and load them into a dataframe + df_plots = [] for _, analysis in analyses.items(): if "plots" in dir(analysis): - analysis.plots(df, ref, com, df["region"].unique()) + df_plots += [analysis.plots(df, ref, com)] + df_plots = pd.concat(df_plots, ignore_index=True) + + # Write plots + for _, row in df_plots.iterrows(): + row["axis"].get_figure().savefig( + root / f"{row['source']}_{row['region']}_{row['name']}.png" + ) # Write out html df = df.reset_index(drop=True) @@ -199,8 +207,8 @@ def post_model_data(analysis_setup): [row.to_dict() for _, row in df.drop(columns="units").iterrows()] ), } - template = open(Path(ilamb3.__path__) / "templates/dataset_page.html").read() - open("tmp.html", mode="w").write(Template(template).render(data)) + template = open(Path(ilamb3.__path__[0]) / "templates/dataset_page.html").read() + open(root / "index.html", mode="w").write(Template(template).render(data)) post_time = time.time() - post_time logger.info(f"Post-processing completed in {post_time:.1f} [s]") @@ -224,8 +232,13 @@ def setup_analyses(**analysis_setup) -> dict[str, ILAMBAnalysis]: variable = list(sources.keys())[0] # Setup the default analysis + cmap = ( + analysis_setup.pop("variable_cmap") + if "variable_cmap" in analysis_setup + else "viridis" + ) analyses = { - name: a(variable) + name: a(variable, variable_cmap=cmap) for name, a in DEFAULT_ANALYSES.items() if analysis_setup.get(f"skip_{name}", False) is False } @@ -250,8 +263,13 @@ def work_default_analysis(model: Model, **analysis_setup): variable = list(sources.keys())[0] # Setup the default analysis + cmap = ( + analysis_setup.pop("variable_cmap") + if "variable_cmap" in analysis_setup + else "viridis" + ) analyses = { - name: a(variable) + name: a(variable, variable_cmap=cmap) for name, a in DEFAULT_ANALYSES.items() if analysis_setup.get(f"skip_{name}", False) is False } From a09e32441e6658803bfb4691c1c64950219a4057 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Tue, 26 Nov 2024 18:12:06 -0500 Subject: [PATCH 05/11] embed the post in try/except blocks --- ilamb3/workflow.py | 105 ++++++++++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 45 deletions(-) diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py index 1948e0f..b78a070 100644 --- a/ilamb3/workflow.py +++ b/ilamb3/workflow.py @@ -150,13 +150,7 @@ def load_local_assets(root: str | Path) -> tuple[pd.DataFrame, dict[str, xr.Data def post_model_data(analysis_setup): - """ - - combine csv files, remove references - - read in netcdf files - - initialize analysis - - call post - - render html - """ + """.""" # Setup logging root = Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) logfile = root / "post.log" @@ -169,46 +163,61 @@ def post_model_data(analysis_setup): post_time = time.time() # Load what we have in the local directory - df, com = load_local_assets(root) - ref = com.pop("Reference") if "Reference" in com else xr.Dataset() - - # Make plots and load them into a dataframe - df_plots = [] - for _, analysis in analyses.items(): - if "plots" in dir(analysis): - df_plots += [analysis.plots(df, ref, com)] - df_plots = pd.concat(df_plots, ignore_index=True) - - # Write plots - for _, row in df_plots.iterrows(): - row["axis"].get_figure().savefig( - root / f"{row['source']}_{row['region']}_{row['name']}.png" - ) + try: + df, com = load_local_assets(root) + ref = com.pop("Reference") if "Reference" in com else xr.Dataset() + except Exception: + logger.error("An exception was encountered loading local assets.") + logger.error(format_exc()) + return - # Write out html - df = df.reset_index(drop=True) - df["id"] = df.index - data = { - "page_header": "gpp | FLUXCOM | 1980-2013", - "model_names": [m for m in df["source"].unique() if m != "Reference"], - "regions": { - (None if key == "None" else key): ( - "All Data" if key == "None" else ilamb_regions.get_name(key) + # Make plots and write plots + try: + df_plots = [] + for _, analysis in analyses.items(): + if "plots" in dir(analysis): + df_plots += [analysis.plots(df, ref, com)] + df_plots = pd.concat(df_plots, ignore_index=True) + for _, row in df_plots.iterrows(): + row["axis"].get_figure().savefig( + root / f"{row['source']}_{row['region']}_{row['name']}.png" ) - for key in df["region"].unique() - }, - "analyses": list(df["analysis"].unique()), - "data_information": { - "Title": "FLUXCOM (RS+METEO) Global Land Carbon Fluxes using CRUNCEP climate data", - "Institutions": "Department Biogeochemical Integration, Max Planck Institute for Biogeochemistry, Germany", - "Version": "1", - }, - "table_data": str( - [row.to_dict() for _, row in df.drop(columns="units").iterrows()] - ), - } - template = open(Path(ilamb3.__path__[0]) / "templates/dataset_page.html").read() - open(root / "index.html", mode="w").write(Template(template).render(data)) + except Exception: + logger.error("An exception was encountered creating plots") + logger.error(format_exc()) + return + + # Write out html + try: + df = df.reset_index(drop=True) + df["id"] = df.index + data = { + "page_header": ref.attrs["header"] if "header" in ref.attrs else "", + "model_names": [m for m in df["source"].unique() if m != "Reference"], + "regions": { + (None if key == "None" else key): ( + "All Data" if key == "None" else ilamb_regions.get_name(key) + ) + for key in df["region"].unique() + }, + "analyses": list(df["analysis"].unique()), + "data_information": { + key.capitalize(): ref.attrs[key] + for key in ["title", "institutions", "version"] + if key in ref.attrs + }, + "table_data": str( + [row.to_dict() for _, row in df.drop(columns="units").iterrows()] + ), + } + template = open( + Path(ilamb3.__path__[0]) / "templates/dataset_page.html" + ).read() + open(root / "index.html", mode="w").write(Template(template).render(data)) + except Exception: + logger.error("An exception was encountered creating the html page.") + logger.error(format_exc()) + return post_time = time.time() - post_time logger.info(f"Post-processing completed in {post_time:.1f} [s]") @@ -282,6 +291,10 @@ def work_default_analysis(model: Model, **analysis_setup): # Get reference data ref = {v: open_reference_data(s) for v, s in (sources | relationships).items()} + header = f"{variable} | {Path(analysis_setup['path']).name}" + if dset.is_temporal(ref[variable]): + header += f" | {ref[variable]['time'].dt.year.min():d}-{ref[variable]['time'].dt.year.max():d}" + attrs = ref[variable].attrs if relationships: # Interpolate relationships to the reference grid lat = ref[variable][dset.get_dim_name(ref[variable], "lat")] @@ -321,6 +334,8 @@ def work_default_analysis(model: Model, **analysis_setup): dfs["source"] = dfs["source"].str.replace("Comparison", model.name) ds_ref = xr.merge(ds_refs).pint.dequantify() ds_com = xr.merge(ds_coms).pint.dequantify() + ds_ref.attrs = attrs + ds_ref.attrs["header"] = header return dfs, ds_ref, ds_com From 1676a16be606ed4d4581dbda50cf7ab48c809d50 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Wed, 27 Nov 2024 09:48:12 -0500 Subject: [PATCH 06/11] add titles to figures --- ilamb3/analysis/bias.py | 7 ++++--- ilamb3/plot.py | 4 ++-- ilamb3/workflow.py | 6 +++--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ilamb3/analysis/bias.py b/ilamb3/analysis/bias.py index 58a3cb9..2c186b5 100644 --- a/ilamb3/analysis/bias.py +++ b/ilamb3/analysis/bias.py @@ -314,9 +314,9 @@ def plots( # Setup plot data df = plt.determine_plot_limits(com).set_index("name") - df.loc["mean", "cmap"] = self.cmap - df.loc["bias", "cmap"] = "seismic" - df.loc["biasscore", "cmap"] = "plasma" + df.loc["mean", ["cmap", "title"]] = [self.cmap, "Period Mean"] + df.loc["bias", ["cmap", "title"]] = ["seismic", "Bias"] + df.loc["biasscore", ["cmap", "title"]] = ["plasma", "Bias Score"] # Build up a dataframe of matplotlib axes axs = [ @@ -331,6 +331,7 @@ def plots( vmin=df.loc[plot, "low"], vmax=df.loc[plot, "high"], cmap=df.loc[plot, "cmap"], + title=source + " " + df.loc[plot, "title"], ) if plot in ds else pd.NA diff --git a/ilamb3/plot.py b/ilamb3/plot.py index 26ec9b0..64dcec5 100644 --- a/ilamb3/plot.py +++ b/ilamb3/plot.py @@ -36,7 +36,6 @@ def pick_projection(extents: list[float]) -> tuple[ccrs.Projection, float]: def finalize_plot(ax: plt.Axes, extents: list[float]) -> plt.Axes: """Add some final features to our plots.""" - ax.set_title("") # We don't want xarray's title ax.add_feature( cfeature.NaturalEarthFeature( "physical", "land", "110m", edgecolor="face", facecolor="0.875" @@ -66,6 +65,7 @@ def plot_map(da: xr.DataArray, **kwargs): # Process some options kwargs["cmap"] = plt.get_cmap(kwargs["cmap"] if "cmap" in kwargs else "viridis", 9) + title = kwargs.pop("title") if "title" in kwargs else "" # Process region if given ilamb_regions = Regions() @@ -109,7 +109,7 @@ def plot_map(da: xr.DataArray, **kwargs): ) else: raise ValueError("plotting error") - + ax.set_title(title) ax = finalize_plot(ax, extents) return ax diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py index b78a070..7dc5795 100644 --- a/ilamb3/workflow.py +++ b/ilamb3/workflow.py @@ -484,12 +484,12 @@ def run_study(study_setup: str, model_setup: str): if not _is_complete(w, ilamb3.conf["build_dir"]) ] - # Phase I + # Phase 1: analysis if work_list: with MPIPoolExecutor() as executor: results = tqdm( executor.map(work_model_data, work_list), - bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s}{postfix}]", + bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s} {postfix}]", desc="Analysis of Model-Data Pairs", unit="pair", total=len(work_list), @@ -500,7 +500,7 @@ def run_study(study_setup: str, model_setup: str): with MPIPoolExecutor() as executor: results = tqdm( executor.map(post_model_data, analyses_list), - bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s}{postfix}]", + bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s} {postfix}]", desc="Post-processing", unit="analysis", total=len(analyses_list), From cbcf151118cd9515656b8180514688042f523c55 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Wed, 27 Nov 2024 12:56:02 -0500 Subject: [PATCH 07/11] analysis plots populate dynamically --- ilamb3/analysis/bias.py | 2 +- ilamb3/templates/dataset_page.html | 19 ++++++++++++------ ilamb3/workflow.py | 31 ++++++++++++++++++++++++------ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/ilamb3/analysis/bias.py b/ilamb3/analysis/bias.py index 2c186b5..3b7dbf3 100644 --- a/ilamb3/analysis/bias.py +++ b/ilamb3/analysis/bias.py @@ -337,7 +337,7 @@ def plots( else pd.NA ), } - for plot in df.index + for plot in ["mean", "bias", "biasscore"] for source, ds in com.items() for region in regions ] diff --git a/ilamb3/templates/dataset_page.html b/ilamb3/templates/dataset_page.html index 710ccd5..b632a03 100644 --- a/ilamb3/templates/dataset_page.html +++ b/ilamb3/templates/dataset_page.html @@ -85,7 +85,7 @@ From c6a16b63f8ce1952544bab3619c7870935e1748d Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Wed, 27 Nov 2024 16:47:10 -0500 Subject: [PATCH 09/11] regional analysis works --- ilamb3/analysis/bias.py | 1 - ilamb3/config.py | 12 ++++++++++++ ilamb3/dataset.py | 4 ++-- ilamb3/plot.py | 2 +- ilamb3/templates/dataset_page.html | 8 ++++---- ilamb3/workflow.py | 2 +- 6 files changed, 20 insertions(+), 9 deletions(-) diff --git a/ilamb3/analysis/bias.py b/ilamb3/analysis/bias.py index 1a6d7f0..fcceb17 100644 --- a/ilamb3/analysis/bias.py +++ b/ilamb3/analysis/bias.py @@ -114,7 +114,6 @@ def __call__( varname = self.req_variable if use_uncertainty and "bounds" not in ref[varname].attrs: use_uncertainty = False - # Checks on the database if it is being used if method == "RegionalQuantiles": check_quantile_database(quantile_dbase) diff --git a/ilamb3/config.py b/ilamb3/config.py index f4fd680..11097aa 100644 --- a/ilamb3/config.py +++ b/ilamb3/config.py @@ -6,8 +6,11 @@ import yaml +import ilamb3.regions as reg + defaults = { "build_dir": "./_build", + "regions": [None], } @@ -50,11 +53,20 @@ def set( self, *, build_dir: str | None = None, + regions: list[str] | None = None, ): """Change ilamb3 configuration options.""" temp = copy.deepcopy(self) if build_dir is not None: self["build_dir"] = str(build_dir) + if regions is not None: + ilamb_regions = reg.Regions() + does_not_exist = set(regions) - set(ilamb_regions._regions) - set([None]) + if does_not_exist: + raise ValueError( + f"Cannot run ILAMB over these regions [{list(does_not_exist)}] which are not registered in our system [{list(ilamb_regions._regions)}]" + ) + self["regions"] = regions return self._unset(temp) def __getitem__(self, item): diff --git a/ilamb3/dataset.py b/ilamb3/dataset.py index e2d97c0..2649ef8 100644 --- a/ilamb3/dataset.py +++ b/ilamb3/dataset.py @@ -5,8 +5,8 @@ import numpy as np import xarray as xr +import ilamb3.regions as ilreg from ilamb3.exceptions import NoSiteDimension -from ilamb3.regions import Regions def get_dim_name( @@ -550,7 +550,7 @@ def integrate_space( only be done in a single dimension at a time. """ if region is not None: - regions = Regions() + regions = ilreg.Regions() dset = regions.restrict_to_region(dset, region) space = [get_dim_name(dset, "lat"), get_dim_name(dset, "lon")] if not isinstance(dset, xr.Dataset): diff --git a/ilamb3/plot.py b/ilamb3/plot.py index 64dcec5..bc5d581 100644 --- a/ilamb3/plot.py +++ b/ilamb3/plot.py @@ -30,7 +30,7 @@ def pick_projection(extents: list[float]) -> tuple[ccrs.Projection, float]: else: aspect_ratio = max(extents[1], extents[0]) - min(extents[1], extents[0]) aspect_ratio /= max(extents[3], extents[2]) - min(extents[3], extents[2]) - proj = ccrs.PlateCarree(central_longitude=extents[:2].mean()) + proj = ccrs.PlateCarree(central_longitude=np.array(extents)[:2].mean()) return proj, aspect_ratio diff --git a/ilamb3/templates/dataset_page.html b/ilamb3/templates/dataset_page.html index 68eebe8..6cea4a0 100644 --- a/ilamb3/templates/dataset_page.html +++ b/ilamb3/templates/dataset_page.html @@ -293,12 +293,12 @@

{{aname}}

-{% for pname,plot in analysis.items() %} -{% for item in plot %} +{% for pname,plot in analysis.items() -%} +{% for item in plot -%} {% for mname,pattern in item.items() %} -{% endfor %} -{% endfor %} +{% endfor -%} +{% endfor -%} {% endfor %} {% endfor %} diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py index 0e3c587..c76f2dc 100644 --- a/ilamb3/workflow.py +++ b/ilamb3/workflow.py @@ -341,7 +341,7 @@ def work_default_analysis(model: Model, **analysis_setup): ds_coms = [] for name, a in analyses.items(): try: - df, ds_ref, ds_com = a(ref, com) + df, ds_ref, ds_com = a(ref, com, regions=ilamb3.conf["regions"]) except Exception: raise AnalysisFailure(name, variable, "?", model.name) dfs.append(df) From a1e61f2499ec03c10bc6d7ff82581a93ae725332 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Fri, 24 Jan 2025 15:45:28 -0500 Subject: [PATCH 10/11] removing unwanted capability --- ilamb3/workflow.py | 527 --------------------------------------------- 1 file changed, 527 deletions(-) delete mode 100644 ilamb3/workflow.py diff --git a/ilamb3/workflow.py b/ilamb3/workflow.py deleted file mode 100644 index c76f2dc..0000000 --- a/ilamb3/workflow.py +++ /dev/null @@ -1,527 +0,0 @@ -import glob -import itertools -import logging -import os -import re -import time -import warnings -from inspect import isclass -from pathlib import Path -from traceback import format_exc - -import pandas as pd -import xarray as xr -import yaml -from jinja2 import Template -from mpi4py.futures import MPIPoolExecutor -from tqdm import tqdm - -import ilamb3 -import ilamb3.analysis as anl -import ilamb3.dataset as dset -import ilamb3.models as ilamb_models -from ilamb3.analysis.base import ILAMBAnalysis -from ilamb3.exceptions import AnalysisFailure -from ilamb3.models.base import Model -from ilamb3.regions import Regions - -DEFAULT_ANALYSES = {"Bias": anl.bias_analysis} - - -def open_reference_data(key_or_path: str | Path) -> xr.Dataset: - """ - Load the reference data. - - Parameters - ---------- - key_or_path: str or Path - The key or path to the reference data. First we check if it is a key in the - `ilamb3.ilamb_catalog()` and then if it is a path to a file. That path may be - absolute or relative to an environment variable `ILAMB_ROOT`. - - Returns - ------- - xr.Dataset - The reference data. - """ - key_or_path = Path(key_or_path) - if key_or_path.is_file(): - return xr.open_dataset(key_or_path) - if "ILAMB_ROOT" in os.environ: - root = Path(os.environ["ILAMB_ROOT"]) - if (root / key_or_path).is_file(): - return xr.open_dataset(root / key_or_path) - cat = ilamb3.ilamb_catalog() - key_or_path = str(key_or_path) - if key_or_path in cat: - return cat[key_or_path].read() - raise FileNotFoundError( - f"'{key_or_path}' is not a key in the ilamb3 catalog, nor is it a valid file as an absolute path or relative to ILAMB_ROOT={root}" - ) - - -def _get_logger(logfile: str, reset: bool = False) -> logging.Logger: - - # Where will the log be written? - logfile = Path(logfile).expanduser() - if reset and logfile.exists(): - logfile.unlink() - logfile.touch() - - # We need a named logger to avoid other packages that use the root logger - logger = logging.getLogger(str(logfile)) - if not logger.handlers: - # Now setup the file into which we log relevant information - file_handler = logging.FileHandler(logfile) - file_handler.setFormatter( - logging.Formatter( - "[\x1b[36;20m%(asctime)s\033[0m][\x1b[36;20m%(levelname)s\033[0m] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - ) - file_handler.setLevel(logging.INFO) - logger.addHandler(file_handler) - logger.setLevel(logging.INFO) - - # This is probably wrong, but when I log from my logger it logs from parent also - logger.parent.handlers = [] - return logger - - -def work_model_data(work): - - # Unpack the work - model, analysis_setup = work - - # Setup logging - root = Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) - logfile = root / f"{model.name}.log" - logger = _get_logger(logfile, reset=True) - - logger.info(f"Begin {analysis_setup['path']} {model.name}") - with warnings.catch_warnings(record=True) as recorded_warnings: - # If a specialized analysis will be required, then it should be specified in the - # analysis_setup. - try: - if "analysis" in analysis_setup: - raise NotImplementedError() - else: - analysis_time = time.time() - df, ds_ref, ds_com = work_default_analysis(model, **analysis_setup) - - # serialize, the reference is tricky as many threads may be trying - df.to_csv(root / f"{model.name}.csv", index=False) - ds_com.to_netcdf(root / f"{model.name}.nc") - ref_file = root / "Reference.nc" - if not ref_file.is_file(): - try: - ds_ref.to_netcdf(ref_file) - logger.info(f"Saved reference file: {ref_file}") - except Exception: - logger.info("Did not save reference file.") - - analysis_time = time.time() - analysis_time - logger.info(f"Analysis completed in {analysis_time:.1f} [s]") - except Exception: - logger.error(format_exc()) - - # Dump the warnings to the logfile - for w in recorded_warnings: - logger.warning(str(w.message)) - - return - - -def load_local_assets(root: str | Path) -> tuple[pd.DataFrame, dict[str, xr.Dataset]]: - root = Path(root) - df = pd.concat( - [ - pd.read_csv(f, keep_default_na=False, na_values=["NaN"]) - for f in glob.glob(str(root / "*.csv")) - ] - ).drop_duplicates(subset=["source", "region", "analysis", "name"]) - df["name"] = df["name"] + " [" + df["units"] + "]" - dsd = { - Path(key).stem: xr.open_dataset(key) for key in glob.glob(str(root / "*.nc")) - } - # consistency checks - assert set(df["source"]) == set(dsd.keys()) - return df, dsd - - -def post_model_data(analysis_setup): - """.""" - # Setup logging - root = Path(ilamb3.conf["build_dir"]) / (analysis_setup["path"]) - logfile = root / "post.log" - logger = _get_logger(logfile, reset=True) - - analyses = setup_analyses(**analysis_setup) - ilamb_regions = Regions() - logger.info(f"Begin post {analysis_setup['path']}") - with warnings.catch_warnings(record=True) as recorded_warnings: - post_time = time.time() - - # Load what we have in the local directory - try: - df, com = load_local_assets(root) - ref = com.pop("Reference") if "Reference" in com else xr.Dataset() - except Exception: - logger.error("An exception was encountered loading local assets.") - logger.error(format_exc()) - return - - # Make plots and write plots - try: - df_plots = [] - for aname, analysis in analyses.items(): - if "plots" in dir(analysis): - dfp = analysis.plots(df, ref, com) - dfp["analysis"] = aname - df_plots += [dfp] - df_plots = pd.concat(df_plots, ignore_index=True) - for _, row in df_plots.iterrows(): - row["axis"].get_figure().savefig( - root / f"{row['source']}_{row['region']}_{row['name']}.png" - ) - except Exception: - logger.error("An exception was encountered creating plots") - logger.error(format_exc()) - return - - # Setup template analyses and plots - analyses = {analysis: {} for analysis in df["analysis"].unique()} - for (aname, pname), df_grp in df_plots.groupby( - ["analysis", "name"], sort=False - ): - analyses[aname][pname] = [] - if "Reference" in df_grp["source"].unique(): - analyses[aname][pname] += [ - {"Reference": f"Reference_RNAME_{pname}.png"} - ] - analyses[aname][pname] += [{"Model": f"MNAME_RNAME_{pname}.png"}] - ref_plots = list(df_plots[df_plots["source"] == "Reference"]["name"].unique()) - mod_plots = list(df_plots[df_plots["source"] != "Reference"]["name"].unique()) - - # Write out html - try: - df = df.reset_index(drop=True) - df["id"] = df.index - data = { - "page_header": ref.attrs["header"] if "header" in ref.attrs else "", - "analysis_list": list(analyses.keys()), - "model_names": [m for m in df["source"].unique() if m != "Reference"], - "ref_plots": ref_plots, - "mod_plots": mod_plots, - "regions": { - (None if key == "None" else key): ( - "All Data" if key == "None" else ilamb_regions.get_name(key) - ) - for key in df["region"].unique() - }, - "analyses": analyses, - "data_information": { - key.capitalize(): ref.attrs[key] - for key in ["title", "institutions", "version"] - if key in ref.attrs - }, - "table_data": str( - [row.to_dict() for _, row in df.drop(columns="units").iterrows()] - ), - } - template = open( - Path(ilamb3.__path__[0]) / "templates/dataset_page.html" - ).read() - open(root / "index.html", mode="w").write(Template(template).render(data)) - except Exception: - logger.error("An exception was encountered creating the html page.") - logger.error(format_exc()) - return - - post_time = time.time() - post_time - logger.info(f"Post-processing completed in {post_time:.1f} [s]") - - # Dump the warnings to the logfile - for w in recorded_warnings: - logger.warning(str(w.message)) - - return - - -def setup_analyses(**analysis_setup) -> dict[str, ILAMBAnalysis]: - - # Check on inputs - sources = analysis_setup.get("sources", {}) - relationships = analysis_setup.get("relationships", {}) - if len(sources) != 1: - raise ValueError( - f"The default ILAMB analysis requires a single variable and source, but I found: {sources}" - ) - variable = list(sources.keys())[0] - - # Setup the default analysis - cmap = ( - analysis_setup.pop("variable_cmap") - if "variable_cmap" in analysis_setup - else "viridis" - ) - analyses = { - name: a(variable, variable_cmap=cmap) - for name, a in DEFAULT_ANALYSES.items() - if analysis_setup.get(f"skip_{name.lower()}", False) is False - } - analyses.update( - { - f"rel_{ind_variable}": anl.relationship_analysis(variable, ind_variable) - for ind_variable in relationships - } - ) - return analyses - - -def work_default_analysis(model: Model, **analysis_setup): - - # Check on inputs - sources = analysis_setup.get("sources", {}) - relationships = analysis_setup.get("relationships", {}) - if len(sources) != 1: - raise ValueError( - f"The default ILAMB analysis requires a single variable and source, but I found: {sources}" - ) - variable = list(sources.keys())[0] - - # Setup the default analysis - cmap = ( - analysis_setup.pop("variable_cmap") - if "variable_cmap" in analysis_setup - else "viridis" - ) - analyses = { - name: a(variable, variable_cmap=cmap) - for name, a in DEFAULT_ANALYSES.items() - if analysis_setup.get(f"skip_{name.lower()}", False) is False - } - analyses.update( - { - f"rel_{ind_variable}": anl.relationship_analysis(variable, ind_variable) - for ind_variable in relationships - } - ) - - # Get reference data - ref = {v: open_reference_data(s) for v, s in (sources | relationships).items()} - header = f"{variable} | {Path(analysis_setup['path']).name}" - if dset.is_temporal(ref[variable]): - header += f" | {ref[variable]['time'].dt.year.min():d}-{ref[variable]['time'].dt.year.max():d}" - attrs = ref[variable].attrs - if relationships: - # Interpolate relationships to the reference grid - lat = ref[variable][dset.get_dim_name(ref[variable], "lat")] - lon = ref[variable][dset.get_dim_name(ref[variable], "lon")] - for v in relationships: - ref[v] = ref[v].interp( - { - dset.get_dim_name(ref[v], "lat"): lat, - dset.get_dim_name(ref[v], "lon"): lon, - }, - method="nearest", - ) - ref = xr.merge([v for _, v in ref.items()]) - - # Get the model data, only use the measures from the primary variables - com = [ - model.get_variable(v).drop_vars("cell_measures", errors="ignore") - for v in set([v for _, a in analyses.items() for v in a.required_variables()]) - ] - com = xr.merge(com) - - # Run the analyses - dfs = [] - ds_refs = [] - ds_coms = [] - for name, a in analyses.items(): - try: - df, ds_ref, ds_com = a(ref, com, regions=ilamb3.conf["regions"]) - except Exception: - raise AnalysisFailure(name, variable, "?", model.name) - dfs.append(df) - ds_refs.append(ds_ref) - ds_coms.append(ds_com) - - # Merge results - dfs = pd.concat(dfs) - dfs["source"] = dfs["source"].str.replace("Comparison", model.name) - ds_ref = xr.merge(ds_refs).pint.dequantify() - ds_com = xr.merge(ds_coms).pint.dequantify() - ds_ref.attrs = attrs - ds_ref.attrs["header"] = header - - return dfs, ds_ref, ds_com - - -def flatten_dict(d: dict, parent_key: str = "", sep: str = "/") -> dict: - items = [] - for k, v in d.items(): - new_key = parent_key + sep + k if parent_key else k - if isinstance(v, dict) and "sources" not in v: - items.extend(flatten_dict(v, new_key, sep=sep).items()) - else: - items.append((new_key, v)) - return dict(items) - - -def parse_model_setup(yaml_file: str | Path) -> list[ilamb_models.Model]: - """Parse the model setup file.""" - # parse yaml file - yaml_file = Path(yaml_file) - with open(yaml_file) as fin: - setups = yaml.safe_load(fin) - assert isinstance(setups, dict) - - # does this bit belong elsewhere? - abstract_models = { - name: model - for name, model in ilamb_models.__dict__.items() - if (isclass(model) and issubclass(model, ilamb_models.Model)) - } - - # setup models, needs error checking - models = [] - for name, setup in tqdm( - setups.items(), - bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt}{postfix}]", - desc="Initializing Models", - unit="model", - ): - if "type" in setup: - model_type = setup.pop("type") - mod = abstract_models[model_type](**setup) - models.append(mod) - else: - paths = setup.pop("paths") - mod = abstract_models["Model"](name=name, **setup).find_files(paths) - models.append(mod) - return models - - -def parse_benchmark_setup(yaml_file: str | Path) -> dict: - """Parse the file which is analagous to the old configure file.""" - yaml_file = Path(yaml_file) - with open(yaml_file) as fin: - analyses = yaml.safe_load(fin) - assert isinstance(analyses, dict) - return analyses - - -def _clean_pathname(filename: str) -> str: - """Removes characters we do not want in our paths.""" - invalid_chars = r'[\\/:*?"<>|\s]' - cleaned_filename = re.sub(invalid_chars, "", filename) - return cleaned_filename - - -def _is_leaf(current: dict) -> bool: - """Is the current item in the nested dictionary a leaf?""" - if not isinstance(current, dict): - return False - if "sources" in current: - return True - return False - - -def _add_path(current: dict, path: Path | None = None) -> dict: - """Recursively add the nested dictionary headings as a `path` in the leaves.""" - path = Path() if path is None else path - for key, val in current.items(): - if not isinstance(val, dict): - continue - key_path = path / Path(_clean_pathname(key)) - if _is_leaf(val): - val["path"] = str(key_path) - else: - current[key] = _add_path(val, key_path) - return current - - -def _to_leaf_list(current: dict, leaf_list: list | None = None) -> list: - """Recursively flatten the nested dictionary only returning the leaves.""" - leaf_list = [] if leaf_list is None else leaf_list - for _, val in current.items(): - if not isinstance(val, dict): - continue - if _is_leaf(val): - leaf_list.append(val) - else: - _to_leaf_list(val, leaf_list) - return leaf_list - - -def _create_paths(current: dict, root: Path = Path("_build")): - """Recursively ensure paths in the leaves are created.""" - for _, val in current.items(): - if not isinstance(val, dict): - continue - if _is_leaf(val): - if "path" in val: - (root / Path(val["path"])).mkdir(parents=True, exist_ok=True) - else: - _create_paths(val, root) - - -def _is_complete(work, root) -> bool: - """Have we already performed this work?""" - model, setup = work - scalar_file = Path(root) / setup["path"] / f"{model.name}.csv" - if scalar_file.is_file(): - return True - return False - - -def run_study(study_setup: str, model_setup: str): - - # Define the models - models = parse_model_setup(model_setup) - - # Some yaml text that would get parsed like a dictionary. - analyses = parse_benchmark_setup(study_setup) - - # The yaml analysis setup can be as structured as the user needs. We are no longer - # limited to the `h1` and `h2` headers from ILAMB 2.x. We will detect leaf nodes by - # the presence of a `sources` dictionary. - analyses = _add_path(analyses) - - # Various traversal actions - _create_paths(analyses) - - # Create a list of just the leaves to use in creation all work combinations - analyses_list = _to_leaf_list(analyses) - - # Create a work list but remove things that are already 'done' - work_list = [ - w - for w in itertools.product(models, analyses_list) - if not _is_complete(w, ilamb3.conf["build_dir"]) - ] - - # Phase 1: analysis - if work_list: - with MPIPoolExecutor() as executor: - results = tqdm( - executor.map(work_model_data, work_list), - bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s} {postfix}]", - desc="Analysis of Model-Data Pairs", - unit="pair", - total=len(work_list), - ) - results = list(results) - - # Phase 2: plotting - with MPIPoolExecutor() as executor: - results = tqdm( - executor.map(post_model_data, analyses_list), - bar_format="{desc:>28}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt} [{rate_fmt:>15s} {postfix}]", - desc="Post-processing", - unit="analysis", - total=len(analyses_list), - ) - results = list(results) From 8fe0317877fba99bc499327d857bb6724ce4ebc4 Mon Sep 17 00:00:00 2001 From: Nathan Collier Date: Fri, 24 Jan 2025 16:09:11 -0500 Subject: [PATCH 11/11] fix failing tests --- ilamb3/regions.py | 16 +++++++++++----- pyproject.toml | 1 + uv.lock | 4 +++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/ilamb3/regions.py b/ilamb3/regions.py index eb68d33..5115162 100644 --- a/ilamb3/regions.py +++ b/ilamb3/regions.py @@ -180,17 +180,23 @@ def restrict_to_region( if isinstance(var, xr.DataArray): out = restrict_to_bbox(var, rlat[0], rlat[1], rlon[0], rlon[1]) else: - out = { - key: restrict_to_bbox(var[key], rlat[0], rlat[1], rlon[0], rlon[1]) - for key in var - } + out = xr.Dataset( + { + key: restrict_to_bbox( + var[key], rlat[0], rlat[1], rlon[0], rlon[1] + ) + for key in var + } + ) return out if rtype == 1: _, _, dar = rdata if isinstance(var, xr.DataArray): out = restrict_to_region(var, dar) else: - out = {key: restrict_to_region(var[key], dar) for key in var} + out = xr.Dataset( + {key: restrict_to_region(var[key], dar) for key in var} + ) return out raise ValueError(f"Region type #{rtype} not recognized") diff --git a/pyproject.toml b/pyproject.toml index 6fdc1fb..362f405 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ dependencies = [ "scipy>=1.15.1", "matplotlib>=3.10.0", "cartopy>=0.24.1", + "pyyaml>=6.0.2", ] [dependency-groups] diff --git a/uv.lock b/uv.lock index 0ce0671..55bfc76 100644 --- a/uv.lock +++ b/uv.lock @@ -651,7 +651,7 @@ wheels = [ [[package]] name = "ilamb3" -version = "2024.5.31.post1.dev7+ge2d377f.d20250124" +version = "2024.5.31.post1.dev19+g7fa5efe.d20250124" source = { editable = "." } dependencies = [ { name = "cartopy" }, @@ -662,6 +662,7 @@ dependencies = [ { name = "pandas" }, { name = "pint-xarray" }, { name = "pooch" }, + { name = "pyyaml" }, { name = "scipy" }, { name = "xarray" }, ] @@ -691,6 +692,7 @@ requires-dist = [ { name = "pandas", specifier = ">=2.2.3" }, { name = "pint-xarray", specifier = ">=0.4" }, { name = "pooch", specifier = ">=1.8.2" }, + { name = "pyyaml", specifier = ">=6.0.2" }, { name = "scipy", specifier = ">=1.15.1" }, { name = "xarray", specifier = ">=2025.1.1" }, ]