diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..a19ade0 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +CHANGELOG.md merge=union diff --git a/.github/workflows/changelog-pr-update.yml b/.github/workflows/changelog-pr-update.yml index 4bc51df..73cb1eb 100644 --- a/.github/workflows/changelog-pr-update.yml +++ b/.github/workflows/changelog-pr-update.yml @@ -5,6 +5,9 @@ on: branches: - main - develop + paths-ignore: + - .pre-commit-config.yaml + - .readthedocs.yaml jobs: Check-Changelog: name: Check Changelog Action diff --git a/.github/workflows/changelog-release-update.yml b/.github/workflows/changelog-release-update.yml new file mode 100644 index 0000000..fc0a29a --- /dev/null +++ b/.github/workflows/changelog-release-update.yml @@ -0,0 +1,36 @@ +# .github/workflows/update-changelog.yaml +name: "Update Changelog" + +on: + release: + types: [released] + + workflow_dispatch: ~ + +permissions: + pull-requests: write + contents: write + +jobs: + update: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ github.event.release.target_commitish }} + + - name: Update Changelog + uses: stefanzweifel/changelog-updater-action@v1 + with: + latest-version: ${{ github.event.release.tag_name }} + heading-text: ${{ github.event.release.name }} + + - name: Create Pull Request + uses: peter-evans/create-pull-request@v6 + with: + branch: docs/changelog-update-${{ github.event.release.tag_name }} + title: '[Changelog] Update to ${{ github.event.release.tag_name }}' + add-paths: | + CHANGELOG.md diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index 472e29b..54a0818 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -1,56 +1,26 @@ +--- # This workflow will upload a Python Package using Twine when a release is created # For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries name: Upload Python Package on: - - push: {} - pull_request: release: types: [created] jobs: quality: - name: Code QA - runs-on: ubuntu-latest - steps: - - run: sudo apt-get install -y pandoc # Needed by sphinx for notebooks - - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 - with: - python-version: 3.x - - uses: pre-commit/action@v3.0.1 + uses: ecmwf-actions/reusable-workflows/.github/workflows/qa-precommit-run.yml@v2 + with: + skip-hooks: "no-commit-to-branch" checks: strategy: - fail-fast: false matrix: - platform: ["ubuntu-latest", "macos-latest"] - python-version: ["3.10"] - - name: Python ${{ matrix.python-version }} on ${{ matrix.platform }} - runs-on: ${{ matrix.platform }} - - steps: - - uses: actions/checkout@v4 - - - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Install - run: | - pip install -e .[all,tests] - pip freeze - - - name: Tests - run: | - # disable tests on github to avoid giving away the token - # cd tests && python3 test_all.py + python-version: ["3.9", "3.10"] + uses: ecmwf-actions/reusable-workflows/.github/workflows/qa-pytest-pyproject.yml@v2 deploy: - needs: [checks, quality] uses: ecmwf-actions/reusable-workflows/.github/workflows/cd-pypi.yml@v2 secrets: inherit diff --git a/.github/workflows/python-pull-request.yml b/.github/workflows/python-pull-request.yml new file mode 100644 index 0000000..13e4b87 --- /dev/null +++ b/.github/workflows/python-pull-request.yml @@ -0,0 +1,19 @@ +--- +# This workflow will upload a Python Package using Twine when a release is created from main +# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries + +name: Code Quality checks for PRs + +on: + push: + pull_request: + types: [opened, synchronize, reopened] + +jobs: + quality: + uses: ecmwf-actions/reusable-workflows/.github/workflows/qa-precommit-run.yml@v2 + with: + skip-hooks: "no-commit-to-branch" + + checks: + uses: ecmwf-actions/reusable-workflows/.github/workflows/qa-pytest-pyproject.yml@v2 diff --git a/.gitignore b/.gitignore index 2137d4c..7839c73 100644 --- a/.gitignore +++ b/.gitignore @@ -121,6 +121,7 @@ celerybeat.pid # Environments .env +.envrc .venv env/ venv/ @@ -186,3 +187,7 @@ _build/ *.sync _version.py *.code-workspace + + +# Environments +.envrc diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e59c078..38590c6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,6 +20,12 @@ repos: - id: no-commit-to-branch # Prevent committing to main / master - id: check-added-large-files # Check for large files added to git - id: check-merge-conflict # Check for files that contain merge conflict +- repo: https://github.com/pre-commit/pygrep-hooks + rev: v1.10.0 # Use the ref you want to point at + hooks: + - id: python-use-type-annotations # Check for missing type annotations + - id: python-check-blanket-noqa # Check for # noqa: all + - id: python-no-log-warn # Check for log.warn - repo: https://github.com/psf/black-pre-commit-mirror rev: 24.10.0 hooks: @@ -34,7 +40,7 @@ repos: - --force-single-line-imports - --profile black - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.7.1 + rev: v0.7.4 hooks: - id: ruff args: @@ -59,6 +65,20 @@ repos: - id: optional-dependencies-all args: ["--inplace", "--exclude-keys=dev,docs,tests", "--group=dev=all,docs,tests"] - repo: https://github.com/tox-dev/pyproject-fmt - rev: "v2.4.3" + rev: "v2.5.0" hooks: - id: pyproject-fmt +- repo: https://github.com/jshwi/docsig # Check docstrings against function sig + rev: v0.60.1 + hooks: + - id: docsig + args: + - --ignore-no-params # Allow docstrings without parameters + - --check-dunders # Check dunder methods + - --check-overridden # Check overridden methods + - --check-protected # Check protected methods + - --check-class # Check class docstrings + - --disable=E113 # Disable empty docstrings + - --summary # Print a summary +ci: + autoupdate_schedule: monthly diff --git a/CHANGELOG.md b/CHANGELOG.md index 38aff34..bf9ae67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Keep it human-readable, your future self will thank you! - Fix docstring errors - Fix import errors [#18](https://github.com/ecmwf/anemoi-registry/pull/18) - Remove usage of obsolete upload function from anemoi-utils. +- Add worker to updated datsets. ### Changed - Replaces the deploy workflow with cd-pypi diff --git a/docs/naming-conventions.rst b/docs/naming-conventions.rst index ffd605f..c55e079 100644 --- a/docs/naming-conventions.rst +++ b/docs/naming-conventions.rst @@ -74,7 +74,7 @@ The tables below provides more details and some examples. aifs-od-an-oper-0001-mars-o96-2020-2020-6h-v5 - - **frequency** - - 1h (could be : 6h) + - 1h (could be : 6h, 10m for 10 minutes) - - **version** diff --git a/pyproject.toml b/pyproject.toml index 76ffb51..0fa2efa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,20 +8,13 @@ # nor does it submit to any jurisdiction. [build-system] -requires = [ - "setuptools>=60", - "setuptools-scm>=8", -] +requires = [ "setuptools>=60", "setuptools-scm>=8" ] [project] name = "anemoi-registry" description = "A package to manage a registry or data-driven forecasts." -keywords = [ - "ai", - "registry", - "tools", -] +keywords = [ "ai", "registry", "tools" ] license = { file = "LICENSE" } authors = [ @@ -45,15 +38,9 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] -dynamic = [ - "version", -] +dynamic = [ "version" ] -dependencies = [ - "anemoi-datasets", - "jsonpatch", - "requests", -] +dependencies = [ "anemoi-datasets", "jsonpatch", "requests" ] optional-dependencies.all = [ "boto3", @@ -92,9 +79,7 @@ urls.Repository = "https://github.com/ecmwf/anemoi-registry/" scripts.anemoi-registry = "anemoi.registry.__main__:main_PYTHON_ARGCOMPLETE_OK" [tool.setuptools.package-data] -"anemoi.registry" = [ - "*.yaml", -] +"anemoi.registry" = [ "*.yaml" ] [tool.setuptools_scm] version_file = "src/anemoi/registry/_version.py" diff --git a/src/anemoi/registry/commands/update.py b/src/anemoi/registry/commands/update.py index 0c7dd0e..3380a03 100644 --- a/src/anemoi/registry/commands/update.py +++ b/src/anemoi/registry/commands/update.py @@ -29,6 +29,7 @@ def _shorten(d): class Update: + """Update""" internal = True timestamp = True @@ -82,12 +83,19 @@ def run(self, args): elif args.zarr_file_from_catalogue: method = self.zarr_file_from_catalogue + def _error(message): + LOG.error(message) + if not args.ignore: + raise ValueError(message) + LOG.error("%s", message) + LOG.warning("Continuing with --ignore.") + for path in args.paths: if args.resume and path in done: LOG.info(f"Skipping {path}") continue try: - method(path, args) + method(path, _error=_error, **vars(args)) except Exception as e: if args.continue_: LOG.exception(e) @@ -97,59 +105,82 @@ def run(self, args): with open(args.progress, "a") as f: print(path, file=f) - def _error(self, args, message): - LOG.error(message) - if not args.ignore: - raise ValueError(message) - LOG.error("%s", message) - LOG.warning("Continuing with --ignore.") + def catalogue_from_recipe_file(self, path, _error, workdir, dry_run, force, update, ignore, debug, **kwargs): + return catalogue_from_recipe_file( + path, + workdir=workdir, + dry_run=dry_run, + force=force, + update=update, + ignore=ignore, + debug=debug, + _error=_error, + ) - def catalogue_from_recipe_file(self, path, args): - """Update the catalogue entry a recipe file.""" + def zarr_file_from_catalogue(self, path, _error, dry_run, ignore, **kwargs): + return zarr_file_from_catalogue(path, dry_run=dry_run, ignore=ignore, _error=_error) - from anemoi.datasets import open_dataset - from anemoi.datasets.create import creator_factory - def entry_set_value(path, value): - if args.dry_run: - LOG.info(f"Would set value {path} to {_shorten(value)}") - else: - LOG.info(f"Setting value {path} to {_shorten(value)}") - entry.set_value(path, value) +def catalogue_from_recipe_file(path, *, workdir, dry_run, force, update, ignore, debug, _error=print): + """Update the catalogue entry a recipe file.""" - LOG.info(f"Updating catalogue entry from recipe: {path}") + from anemoi.datasets import open_dataset + from anemoi.datasets.create import creator_factory - with open(path) as f: - recipe = yaml.safe_load(f) + def entry_set_value(path, value): + if dry_run: + LOG.info(f"Would set value {path} to {_shorten(value)}") + else: + LOG.info(f"Setting value {path} to {_shorten(value)}") + entry.set_value(path, value) - if "name" not in recipe: - self._error(args, "Recipe does not contain a 'name' field.") - return + LOG.info(f"Updating catalogue entry from recipe: {path}") - name = recipe["name"] - base, _ = os.path.splitext(os.path.basename(path)) + with open(path) as f: + recipe = yaml.safe_load(f) - if name != base: - self._error(args, f"Recipe name '{name}' does not match file name '{path}'") + if "name" not in recipe: + _error("Recipe does not contain a 'name' field.") + return - try: - entry = Dataset(name, params={"_": True}) - except CatalogueEntryNotFound: - if args.ignore: - LOG.error(f"Entry not found: {name}") - return - raise + name = recipe["name"] + base, _ = os.path.splitext(os.path.basename(path)) - updated = entry.record["metadata"].get("updated", 0) + if name != base: + _error(f"Recipe name '{name}' does not match file name '{path}'") - if "recipe" in entry.record["_original"]["metadata"]: - LOG.info("%s: `recipe` already in original. Use --force and --update to update", name) - if not args.update or not args.force: - return + try: + entry = Dataset(name, params={"_": True}) + except CatalogueEntryNotFound: + if ignore: + LOG.error(f"Entry not found: {name}") + return + raise - if "recipe" not in entry.record["metadata"] or args.force: - LOG.info("%s, setting `constant_fields` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) - if args.dry_run: + updated = entry.record["metadata"].get("updated", 0) + + if "recipe" in entry.record["_original"]["metadata"]: + LOG.info("%s: `recipe` already in original. Use --force and --update to update", name) + if not update or not force: + return + + # Remove stuff added by prepml + for k in [ + "build_dataset", + "config_format_version", + "config_path", + "dataset_status", + "ecflow", + "metadata", + "platform", + "reading_chunks", + "upload", + ]: + recipe.pop(k, None) + + if "recipe" not in entry.record["metadata"] or force: + LOG.info("%s, setting `recipe` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) + if dry_run: LOG.info("Would set recipe %s", name) else: LOG.info("Setting recipe %s", name) @@ -157,165 +188,171 @@ def entry_set_value(path, value): entry_set_value("/metadata/recipe", recipe) entry_set_value("/metadata/updated", updated + 1) + computed_constant_fields = sorted(open_dataset(name).computed_constant_fields()) + constant_fields = entry.record["metadata"].get("constant_fields", []) + if computed_constant_fields != constant_fields: + LOG.info("%s, setting `constant_fields`", name) + if dry_run: + LOG.info("Would set constant_fields %s", name) + else: + LOG.info("Setting constant_fields %s", name) + entry_set_value("/metadata/constant_fields", computed_constant_fields) + entry_set_value("/metadata/updated", updated + 1) + entry.record["metadata"]["constant_fields"] = computed_constant_fields + if "constant_fields" in entry.record["metadata"] and "variables_metadata" in entry.record["metadata"]: LOG.info("%s, checking `variables_metadata` and `constant_fields`", name) constants = entry.record["metadata"]["constant_fields"] variables_metadata = entry.record["metadata"]["variables_metadata"] - changed = False - for k, v in variables_metadata.items(): + changed = False + for k, v in variables_metadata.items(): + + if k in constants and v.get("constant_in_time") is not True: + v["constant_in_time"] = True + changed = True + LOG.info(f"Setting {k} constant_in_time to True") + + if "is_constant_in_time" in v: + del v["is_constant_in_time"] + changed = True + + if changed: + if debug: + with open(f"{name}.variables_metadata.json", "w") as f: + print(json.dumps(variables_metadata, indent=2), file=f) + entry_set_value("/metadata/variables_metadata", variables_metadata) + entry_set_value("/metadata/updated", updated + 1) + else: + LOG.info("No changes required") + + if "variables_metadata" not in entry.record["metadata"] or force: + LOG.info("%s, setting `variables_metadata` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) + + if dry_run: + LOG.info("Would set `variables_metadata` %s", name) + else: - if k in constants and v.get("constant_in_time") is not True: - v["constant_in_time"] = True - changed = True - LOG.info(f"Setting {k} constant_in_time to True") + dir = os.path.join(workdir, f"anemoi-registry-commands-update-{time.time()}") + os.makedirs(dir) - if "is_constant_in_time" in v: - del v["is_constant_in_time"] - changed = True + try: + tmp = os.path.join(dir, "tmp.zarr") - if changed: - if args.debug: + creator_factory("init", config=path, path=tmp, overwrite=True).run() + + with open(f"{tmp}/.zattrs") as f: + variables_metadata = yaml.safe_load(f)["variables_metadata"] + if debug: with open(f"{name}.variables_metadata.json", "w") as f: print(json.dumps(variables_metadata, indent=2), file=f) + LOG.info("Setting variables_metadata %s", name) entry_set_value("/metadata/variables_metadata", variables_metadata) entry_set_value("/metadata/updated", updated + 1) - else: - LOG.info("No changes required") - if "variables_metadata" not in entry.record["metadata"] or args.force: - LOG.info("%s, setting `variables_metadata` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) + finally: + shutil.rmtree(dir) - if args.dry_run: - LOG.info("Would set `variables_metadata` %s", name) - else: - dir = os.path.join(args.workdir, f"anemoi-registry-commands-update-{time.time()}") - os.makedirs(dir) - - try: - tmp = os.path.join(dir, "tmp.zarr") - - creator_factory("init", config=path, path=tmp, overwrite=True).run() - - with open(f"{tmp}/.zattrs") as f: - variables_metadata = yaml.safe_load(f)["variables_metadata"] - if args.debug: - with open(f"{name}.variables_metadata.json", "w") as f: - print(json.dumps(variables_metadata, indent=2), file=f) - LOG.info("Setting variables_metadata %s", name) - entry_set_value("/metadata/variables_metadata", variables_metadata) - entry_set_value("/metadata/updated", updated + 1) - - finally: - shutil.rmtree(dir) - - if "constant_fields" not in entry.record["metadata"] or args.force: - LOG.info("%s, setting `constant_fields` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) - ds = open_dataset(name) - constant_fields = ds.computed_constant_fields() - LOG.info("%s", constant_fields) - if args.debug: - with open(f"{name}.constant_fields.json", "w") as f: - print(json.dumps(constant_fields, indent=2), file=f) - entry_set_value("/metadata/constant_fields", constant_fields) - entry_set_value("/metadata/updated", updated + 1) +def zarr_file_from_catalogue(path, *, dry_run, ignore, _error=print): + import zarr - def zarr_file_from_catalogue(self, path, args): - import zarr + LOG.info(f"Updating zarr file from catalogue: {path}") - LOG.info(f"Updating zarr file from catalogue: {path}") + if not os.path.exists(path) and not path.startswith("s3://"): + _error(f"File not found: {path}") + return - z = zarr.open(path) - metadata = z.attrs.asdict() + z = zarr.open(path) + metadata = z.attrs.asdict() - if "uuid" not in metadata: - self._error(args, "Zarr metadata does not have a 'uuid' field.") + if "uuid" not in metadata: + _error("Zarr metadata does not have a 'uuid' field.") - match = None - for e in DatasetCatalogueEntryList().get(params={"uuid": metadata["uuid"]}): - if match: - self._error(args, f"Multiple entries found for uuid {metadata['uuid']}") - match = e + match = None + for e in DatasetCatalogueEntryList().get(params={"uuid": metadata["uuid"]}): + if match: + _error(f"Multiple entries found for uuid {metadata['uuid']}") + match = e - if match is None: - self._error(args, f"No entry found for uuid {metadata['uuid']}") + if match is None: + _error(f"No entry found for uuid {metadata['uuid']}") - name = match["name"] - base, _ = os.path.splitext(os.path.basename(path)) + name = match["name"] + base, _ = os.path.splitext(os.path.basename(path)) - if name != base: - self._error(args, f"Metadata name '{name}' does not match file name '{path}'") + if name != base: + _error(f"Metadata name '{name}' does not match file name '{path}'") - try: - entry = Dataset(name) - except CatalogueEntryNotFound: - if args.force: - LOG.error(f"Entry not found: {name}") - return - raise + try: + entry = Dataset(name) + except CatalogueEntryNotFound: + if ignore: + LOG.error(f"Entry not found: {name}") + return + raise - def dict_are_different(d1, d2, path=""): + def dict_are_different(d1, d2, path=""): - def _(d): - return textwrap.shorten(json.dumps(d, ensure_ascii=False), width=80, placeholder="...") + def _(d): + return textwrap.shorten(json.dumps(d, ensure_ascii=False), width=80, placeholder="...") - diff = False + diff = False - if d1 == d2: - return False + if d1 == d2: + return False - if type(d1) is not type(d2): - print(f"Type mismatch at {path}: {type(d1)} != {type(d2)}") - return True + if type(d1) is not type(d2): + print(f"Type mismatch at {path}: {type(d1)} != {type(d2)}") + return True - if isinstance(d1, dict) and isinstance(d2, dict): - for k in d1.keys(): - if k not in d2: - print(f"Key {path + '.' + k} is missing in the local dictionary {_(d1[k])}") - diff = True + if isinstance(d1, dict) and isinstance(d2, dict): + for k in d1.keys(): + if k not in d2: + print(f"Key {path + '.' + k} is missing in the local dictionary {_(d1[k])}") + diff = True - if k in d1 and k in d2 and dict_are_different(d1[k], d2[k], path + "." + k): - diff = True + if k in d1 and k in d2 and dict_are_different(d1[k], d2[k], path + "." + k): + diff = True - for k in d2.keys(): - if k not in d1: - print(f"Key {path + '.' + k} is missing in the remote dictionary {_(d2[k])}") - diff = True + for k in d2.keys(): + if k not in d1: + print(f"Key {path + '.' + k} is missing in the remote dictionary {_(d2[k])}") + diff = True - return diff + return diff - if isinstance(d1, list) and isinstance(d2, list): - if len(d1) != len(d2): - print(f"List length mismatch at {path}: {len(d1)} != {len(d2)}") - return True + if isinstance(d1, list) and isinstance(d2, list): + if len(d1) != len(d2): + print(f"List length mismatch at {path}: {len(d1)} != {len(d2)}") + return True - for i, (a, b) in enumerate(zip(d1, d2)): - if dict_are_different(a, b, path + f"[{i}]"): - diff = True + for i, (a, b) in enumerate(zip(d1, d2)): + if dict_are_different(a, b, path + f"[{i}]"): + diff = True - return diff + return diff - if d1 != d2: - print(f"Value differs at {path}: {d1} != {d2}") - return True + if d1 != d2: + print(f"Value differs at {path}: {d1} != {d2}") + return True - return diff + return diff - # Example usage - entry_metadata = entry.record["metadata"] - diff = dict_are_different(entry_metadata, metadata) + # Example usage + entry_metadata = entry.record["metadata"] + diff = dict_are_different(entry_metadata, metadata) - if not diff: - LOG.info(f"Metadata is up to date: {name}") - return + if not diff: + LOG.info(f"Metadata is up to date: {name}") + return - if args.dry_run: - return + if dry_run: + return - z = zarr.open(path, mode="a") - LOG.info(f"Updating metadata: {name}") - z.attrs.update(entry_metadata) + z = zarr.open(path, mode="a") + LOG.info(f"Updating metadata: {name}") + z.attrs.update(entry_metadata) command = Update diff --git a/src/anemoi/registry/commands/update.readme.md b/src/anemoi/registry/commands/update.readme.md new file mode 100644 index 0000000..a7c9e9e --- /dev/null +++ b/src/anemoi/registry/commands/update.readme.md @@ -0,0 +1,90 @@ +# Dataset Metadata Update Tool + +This tool provides functionality to synchronize metadata between dataset recipe files, catalogue entries, and Zarr datasets. It ensures consistency across different representations of datasets metadata. + +## Prerequisites + +- Up-to-date `anemoi-datasets` package in the environment +- Admin access to the catalogue using `anemoi-registry` (need credentials *and* a specific configuration in the anemoi config file). + +## Task 1: Update Catalogue from Recipe Files + +### Purpose +Updates catalogue entries with the latest metadata derived from recipe files. This involves creating a temporary minimal dataset to extract current metadata values. Note that not all metadata is blindly copied from the minimal datasets, `uuid` and list of dates for instance should not be updated. + + +### Command +```bash +anemoi-registry update --catalogue-from-recipe-file [--workdir DIR] [options] recipe_files... +``` + +### Process +1. Creates a temporary minimal dataset using the "init" creator +2. Extracts current metadata including: + - Recipe information + - Variables metadata information + - Additional information when appropriate +3. Updates the catalogue entry with new metadata + +### Work Directory +A working directory is required to create the temporary datasets. By default, the working directory is set to the current directory (`.`). You can specify a different directory using the `--workdir` option. + +### Options for Catalogue Update +- `--dry-run`: Preview changes without applying them +- `--force`: Update even if entries already exist +- `--update`: Allow updating of existing entries +- `--ignore`: Continue despite trivial errors +- `--workdir DIR`: Specify working directory (default: ".") + +### Example +```bash +# Update single recipe +anemoi-registry update --catalogue-from-recipe-file recipe.yaml + +# Update multiple recipes, updating existing ones, overwriting metadata if it is already there. +anemoi-registry update --catalogue-from-recipe-file --force --update *.yaml +``` + +## Task 2: Update Zarr from Catalogue + +### Purpose +Synchronizes Zarr metadata with the corresponding metadata from the catalogue, ensuring consistency between stored data and catalogue information. + +### Command +```bash +anemoi-registry update --zarr-file-from-catalogue [options] zarr_files... +``` + +### Process +1. Identifies catalogue entry using Zarr's UUID +2. Compares existing Zarr metadata with catalogue metadata +3. Updates Zarr metadata if differences are found + +### Options for Zarr Update +- `--dry-run`: Preview changes without applying them +- `--ignore`: Continue despite non-critical errors +- `--resume`: Resume from previous progress +- `--progress FILE`: Specify progress file for resuming + +### Example +```bash +# Update single Zarr +anemoi-registry update --zarr-file-from-catalogue dataset.zarr + +# Update multiple Zarr with progress tracking +anemoi-registry update --zarr-file-from-catalogue --progress progress.txt data/*.zarr +``` + +## Common Options + +These options work for both tasks: +- `--dry-run`: Show what would be done without making changes +- `--ignore`: Continue processing despite trivial errors +- `--continue`: Continue to next file on error + +## Notes + +1. Always use up-to-date version of `anemoi-datasets` package +2. Use `--dry-run` to verify changes before applying them +3. For batch operations, consider using `--progress` to track completion +4. Detailed logging is provided to track the update process diff --git a/src/anemoi/registry/commands/worker.py b/src/anemoi/registry/commands/worker.py index 0cf8aa5..cc256e9 100644 --- a/src/anemoi/registry/commands/worker.py +++ b/src/anemoi/registry/commands/worker.py @@ -45,6 +45,10 @@ def add_arguments(self, command_parser): transfer.add_argument("--threads", help="Number of threads to use", type=int) transfer.add_argument("--filter-tasks", help="Filter tasks to process (key=value list)", nargs="*", default=[]) + update = subparsers.add_parser("update-dataset", help="Update datasets from the catalogue") + update.add_argument("--destination", help="Platform destination (e.g. leonardo, lumi, marenostrum)") + update.add_argument("--directory", help="The directory where the datasets are located.") + delete = subparsers.add_parser("delete-dataset", help="Delete dataset") delete.add_argument("--platform", help="Platform destination (e.g. leonardo, lumi, marenostrum)") delete.add_argument("--filter-tasks", help="Filter tasks to process (key=value list)", nargs="*", default=[]) @@ -52,7 +56,7 @@ def add_arguments(self, command_parser): dummy = subparsers.add_parser("dummy", help="Dummy worker for test purposes") dummy.add_argument("--arg") - for subparser in [transfer, delete, dummy]: + for subparser in [transfer, delete, dummy, update]: subparser.add_argument("--timeout", help="Die with timeout (SIGALARM) after TIMEOUT seconds.", type=int) subparser.add_argument("--wait", help="Check for new task every WAIT seconds.", type=int) subparser.add_argument("--heartbeat", help="Heartbeat interval", type=int) diff --git a/src/anemoi/registry/entry/__init__.py b/src/anemoi/registry/entry/__init__.py index fdac5b1..ae71c2a 100644 --- a/src/anemoi/registry/entry/__init__.py +++ b/src/anemoi/registry/entry/__init__.py @@ -28,6 +28,8 @@ class CatalogueEntryNotFound(Exception): class CatalogueEntry: + """Base class for a Anemoi catalogue entry.""" + record = None path = None key = None diff --git a/src/anemoi/registry/entry/dataset.py b/src/anemoi/registry/entry/dataset.py index 0972551..4310321 100644 --- a/src/anemoi/registry/entry/dataset.py +++ b/src/anemoi/registry/entry/dataset.py @@ -28,6 +28,8 @@ class DatasetCatalogueEntryList(RestItemList): + """List of dataset catalogue entries.""" + def __init__(self, **kwargs): super().__init__(COLLECTION, **kwargs) @@ -37,6 +39,8 @@ def __iter__(self): class DatasetCatalogueEntry(CatalogueEntry): + """A dataset catalogue entry.""" + collection = COLLECTION main_key = "name" diff --git a/src/anemoi/registry/entry/experiment.py b/src/anemoi/registry/entry/experiment.py index afcd9f2..9861ad1 100644 --- a/src/anemoi/registry/entry/experiment.py +++ b/src/anemoi/registry/entry/experiment.py @@ -31,6 +31,8 @@ class ExperimentCatalogueEntryList(RestItemList): + """List of ExperimentCatalogueEntry objects.""" + def __init__(self, **kwargs): super().__init__(COLLECTION, **kwargs) @@ -40,6 +42,8 @@ def __iter__(self): class ExperimentCatalogueEntry(CatalogueEntry): + """Catalogue entry for an experiment.""" + collection = COLLECTION main_key = "expver" diff --git a/src/anemoi/registry/entry/weights.py b/src/anemoi/registry/entry/weights.py index 3bb7517..81ac244 100644 --- a/src/anemoi/registry/entry/weights.py +++ b/src/anemoi/registry/entry/weights.py @@ -25,6 +25,8 @@ class WeightsCatalogueEntryList(RestItemList): + """List of weights catalogue entries.""" + def __init__(self, **kwargs): super().__init__(COLLECTION, **kwargs) diff --git a/src/anemoi/registry/rest.py b/src/anemoi/registry/rest.py index e87decc..9c7effc 100644 --- a/src/anemoi/registry/rest.py +++ b/src/anemoi/registry/rest.py @@ -64,6 +64,8 @@ def trace_info(): class Rest: + """REST API client.""" + def __init__(self): self.session = requests.Session() self.session.headers.update({"Authorization": f"Bearer {self.token}"}) @@ -154,6 +156,8 @@ def raise_for_status(self, r, errors={}): class RestItem: + """Single catalogue entry from REST API.""" + def __init__(self, collection, key): self.collection = collection self.key = key @@ -188,6 +192,8 @@ def __repr__(self): class RestItemList: + """List of catalogue entries from REST API.""" + def __init__(self, collection): self.collection = collection self.rest = Rest() diff --git a/src/anemoi/registry/tasks.py b/src/anemoi/registry/tasks.py index ef36b1e..d63f157 100644 --- a/src/anemoi/registry/tasks.py +++ b/src/anemoi/registry/tasks.py @@ -23,6 +23,8 @@ class TaskCatalogueEntryList: + """List of task catalogue entries.""" + collection = "tasks" main_key = "uuid" diff --git a/src/anemoi/registry/workers/__init__.py b/src/anemoi/registry/workers/__init__.py index 1da3220..6c829b2 100644 --- a/src/anemoi/registry/workers/__init__.py +++ b/src/anemoi/registry/workers/__init__.py @@ -17,6 +17,7 @@ from anemoi.utils.humanize import when from anemoi.registry import config +from anemoi.registry.tasks import TaskCatalogueEntry from anemoi.registry.tasks import TaskCatalogueEntryList # from anemoi.utils.provenance import trace_info @@ -25,6 +26,8 @@ class Worker: + """Base class for a worker that processes tasks in the queue.""" + name = None def __init__( @@ -129,7 +132,8 @@ def send_heartbeat(): thread.join() @classmethod - def parse_task(cls, task, *keys): + def parse_task(cls, task: TaskCatalogueEntry, *keys: list[str]): + """Parse a task (from the catalogue) and return a list of values for the given keys.""" data = task.record.copy() assert isinstance(data, dict), data @@ -206,6 +210,7 @@ def run_worker(action, **kwargs): from .delete_dataset import DeleteDatasetWorker from .transfer_dataset import TransferDatasetWorker + from .update_dataset import UpdateDatasetWorker workers_config = config().get("workers", {}) worker_config = workers_config.get(action, {}) @@ -229,6 +234,7 @@ def run_worker(action, **kwargs): cls = { "transfer-dataset": TransferDatasetWorker, "delete-dataset": DeleteDatasetWorker, + "update-dataset": UpdateDatasetWorker, "dummy": DummyWorker, }[action] cls(**kwargs).run() diff --git a/src/anemoi/registry/workers/delete_dataset.py b/src/anemoi/registry/workers/delete_dataset.py index 3b891cd..5caafa7 100644 --- a/src/anemoi/registry/workers/delete_dataset.py +++ b/src/anemoi/registry/workers/delete_dataset.py @@ -19,6 +19,8 @@ class DeleteDatasetWorker(Worker): + """Worker to delete a dataset from a platform.""" + name = "delete-dataset" def __init__( diff --git a/src/anemoi/registry/workers/dummy.py b/src/anemoi/registry/workers/dummy.py index c015721..df689c6 100644 --- a/src/anemoi/registry/workers/dummy.py +++ b/src/anemoi/registry/workers/dummy.py @@ -16,6 +16,8 @@ class DummyWorker(Worker): + """A debug worker that logs the task it receives.""" + name = "dummy" def __init__(self, arg, **kwargs): diff --git a/src/anemoi/registry/workers/transfer_dataset.py b/src/anemoi/registry/workers/transfer_dataset.py index 6307df5..23b6b4f 100644 --- a/src/anemoi/registry/workers/transfer_dataset.py +++ b/src/anemoi/registry/workers/transfer_dataset.py @@ -21,6 +21,8 @@ class Progress: + """Progress reporter for transfer tasks.""" + latest = None def __init__(self, task, frequency=60): @@ -65,6 +67,8 @@ def __call__(self, number_of_files, total_size, total_transferred, transfering, class TransferDatasetWorker(Worker): + """Worker to transfer a dataset from one platform to another.""" + name = "transfer-dataset" def __init__( diff --git a/src/anemoi/registry/workers/update_dataset.py b/src/anemoi/registry/workers/update_dataset.py new file mode 100644 index 0000000..4746272 --- /dev/null +++ b/src/anemoi/registry/workers/update_dataset.py @@ -0,0 +1,94 @@ +# (C) Copyright 2024 Anemoi contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + + +import logging +import os + +from anemoi.registry.entry.dataset import DatasetCatalogueEntry + +from . import Worker + +LOG = logging.getLogger(__name__) + + +class UpdateDatasetWorker(Worker): + """Patch datsets from the catalogue""" + + name = "update-dataset" + + def __init__( + self, + destination, + directory=None, + filter_tasks={}, + **kwargs, + ): + super().__init__(**kwargs) + + self.destination = destination + self.directory = directory + if not self.destination: + raise ValueError("No destination platform specified") + + if not self.directory: + # in this case should get the directory from the catalogue + raise NotImplementedError("No directory specified") + + if self.directory: + if not os.path.exists(self.directory): + raise ValueError(f"Directory {self.directory} does not exist") + + self.filter_tasks.update(filter_tasks) + self.filter_tasks["destination"] = self.destination + + def worker_process_task(self, task): + from anemoi.registry.commands.update import zarr_file_from_catalogue + + (destination,) = self.parse_task(task) + assert destination == self.destination, (destination, self.destination) + + for path in os.listdir(self.directory): + path = os.path.join(self.directory, path) + if not path.endswith(".zarr"): + continue + name = os.path.basename(path)[:-5] + + LOG.info(f"Updating dataset '{name}' from catalogue on '{destination}'") + + def check_path(name, path): + entry = DatasetCatalogueEntry(key=name) + locations = entry.record["locations"] + if destination not in locations: + raise ValueError( + f"Platform '{destination}' not registerd as a location in the catalogue. Not updating." + ) + catalogue_path = locations[self.destination]["path"] + if os.path.realpath(path) != os.path.realpath(catalogue_path): + raise ValueError(f"Path '{path}' does not match catalogue path {catalogue_path}. Not updating.") + + try: + check_path(name, path) + zarr_file_from_catalogue(path, dry_run=False, ignore=False, _error=print) + except Exception as e: + LOG.error(f"Error updating {path}: {e}") + continue + + @classmethod + def parse_task(cls, task): + assert task.record["action"] == "update-dataset", task.record["action"] + + destination = super().parse_task(task, "destination") + + if "/" in destination: + raise ValueError(f"Destination '{destination}' must not contain '/', this is a platform name") + if "." in destination: + raise ValueError(f"Destination '{destination}' must not contain '.', this is a platform name") + + return destination diff --git a/tests/dummy-recipe-dataset.yaml b/tests/recipe.yaml similarity index 100% rename from tests/dummy-recipe-dataset.yaml rename to tests/recipe.yaml diff --git a/tests/test_all.py b/tests/test_all.py old mode 100755 new mode 100644 index dd562bb..ace762c --- a/tests/test_all.py +++ b/tests/test_all.py @@ -8,7 +8,13 @@ # nor does it submit to any jurisdiction. import os +import shutil import subprocess +import uuid + +import yaml +import zarr +from anemoi.utils.remote import transfer DATASET = "aifs-ea-an-oper-0001-mars-20p0-1979-1979-6h-v0-testing" DATASET_PATH = f"{DATASET}.zarr" @@ -17,60 +23,116 @@ TMP_DATASET = f"{DATASET}-{pid}" TMP_DATASET_PATH = f"{TMP_DATASET}.zarr" +TMP_RECIPE = f"./{TMP_DATASET}.yaml" + +DATASET_URL = "s3://ml-tests/test-data/anemoi-datasets/create/pipe.zarr/" def run(*args): print(" ".join(args)) try: - subprocess.check_call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + result.check_returncode() except Exception as e: + print("----------------SDTOUT----------------") + print(result.stdout) + print("----------------SDERR----------------") + print(result.stderr) + print("-------------------------------------") e.add_note = f"Command failed: {' '.join(args)}" raise -def setup_module(): - teardown_module(raise_if_error=False) +def setup_experiments(): run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml", "--register") run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml") + +def setup_checkpoints(): run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt", "--register") run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt") + +def setup_datasets(): + # cache to avoid downloading the dataset when re-running the tests if not os.path.exists(DATASET_PATH): - run("anemoi-datasets", "create", "./dummy-recipe-dataset.yaml", DATASET_PATH, "--overwrite") + transfer(DATASET_URL, DATASET_PATH, overwrite=True) assert os.path.exists(DATASET_PATH) - os.symlink(DATASET_PATH, TMP_DATASET_PATH) + # create a temporary recipe file with the right name in + with open("./recipe.yaml", "r") as f: + r = yaml.load(f, Loader=yaml.FullLoader) + r["name"] = TMP_DATASET + with open(TMP_RECIPE, "w") as f: + yaml.dump(r, f) + + # set new uuid to the tmp dataset + shutil.copytree(DATASET_PATH, TMP_DATASET_PATH) + z = zarr.open(TMP_DATASET_PATH) + z.attrs["uuid"] = str(uuid.uuid4()) + + # register the dataset run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--register") + + # check that the dataset is registered run("anemoi-registry", "datasets", TMP_DATASET_PATH) + print("# Setup done") -def teardown_module(raise_if_error=True): - error = None +def _setup_module(): + _teardown_module(raise_if_error=False) + setup_experiments() + setup_checkpoints() + setup_datasets() + + +def teardown_experiments(errors): try: - run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe", "--unregister") + run("anemoi-registry", "experiments", "./dummp-recipe-experiment.yaml", "--unregister") except Exception as e: - error = e + errors.append(e) + +def teardown_checkpoints(errors): try: - run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml", "--unregister") + run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe", "--unregister") except Exception as e: - error = e + errors.append(e) + +def teardown_datasets(errors): try: run("anemoi-registry", "datasets", TMP_DATASET, "--unregister") - os.remove(TMP_DATASET_PATH) except Exception as e: - error = e - if error and raise_if_error: - raise error + errors.append(e) + + try: + os.remove(TMP_RECIPE) + except Exception as e: + errors.append(e) + + try: + shutil.rmtree(TMP_DATASET_PATH) + except Exception as e: + errors.append(e) + +def _teardown_module(raise_if_error=True): + errors = [] + teardown_experiments(errors) + teardown_checkpoints(errors) + teardown_datasets(errors) + if errors and raise_if_error: + for e in errors: + print(e) + raise e -def test_datasets(): + +def _test_datasets(): # assert run("anemoi-registry", "datasets", TMP_DATASET) == 1 run("anemoi-registry", "datasets", TMP_DATASET) - run("anemoi-registry", "datasets", TMP_DATASET, "--set-recipe", "./dummy-recipe-dataset.yaml") + run("anemoi-registry", "datasets", TMP_DATASET, "--set-recipe", TMP_RECIPE) run("anemoi-registry", "datasets", TMP_DATASET, "--set-status", "testing") run( "anemoi-registry", @@ -92,11 +154,14 @@ def test_datasets(): ) run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "ewc") - # do not upload the dataset to avoid polluting the s3 bucket, until we have a way to clean it up automatically - # run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--add-location", "ewc", "--upload") + # This is poluting the s3 bucket, we should have a way to clean it up automatically + run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--add-location", "ewc", "--upload") + + run("anemoi-registry", "update", "--catalogue-from-recipe-file", TMP_RECIPE, "--force", "--update") + run("anemoi-registry", "update", "--zarr-file-from-catalogue", TMP_DATASET_PATH, "--force") -def test_weights(): +def _test_weights(): # assert run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe") == 1 run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe") run( @@ -110,32 +175,57 @@ def test_weights(): ) -def test_experiments(): +def _test_experiments(): run("anemoi-registry", "experiments", "i4df") run("anemoi-registry", "experiments", "i4df", "--add-plots", "./dummy-quaver.pdf") run("anemoi-registry", "experiments", "i4df", "--add-weights", "./dummy-checkpoint.ckpt") -def test_list_commands(): +def _test_list_commands(): run("anemoi-registry", "list", "experiments") run("anemoi-registry", "list", "weights") run("anemoi-registry", "list", "datasets") +def test_print(): + print("test") + + if __name__ == "__main__": - test_list_commands() + _test_list_commands() + print() + + errors = [] + + print("# Start setup") + setup_datasets() + try: + _test_datasets() + finally: + print("# Start teardown") + teardown_datasets(errors) + + print() + + print("# Start setup") + setup_experiments() + try: + _test_experiments() + finally: + print("# Start teardown") + teardown_experiments(errors) + print() print("# Start setup") - setup_module() + setup_checkpoints() try: - print() - test_datasets() - print() - test_weights() - print() - test_experiments() - print() + _test_weights() finally: print("# Start teardown") - teardown_module() + teardown_checkpoints(errors) + + if errors: + for e in errors: + print(e) + raise e