diff --git a/CHANGELOG.md b/CHANGELOG.md index 38aff34..bf9ae67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Keep it human-readable, your future self will thank you! - Fix docstring errors - Fix import errors [#18](https://github.com/ecmwf/anemoi-registry/pull/18) - Remove usage of obsolete upload function from anemoi-utils. +- Add worker to updated datsets. ### Changed - Replaces the deploy workflow with cd-pypi diff --git a/docs/naming-conventions.rst b/docs/naming-conventions.rst index ffd605f..c55e079 100644 --- a/docs/naming-conventions.rst +++ b/docs/naming-conventions.rst @@ -74,7 +74,7 @@ The tables below provides more details and some examples. aifs-od-an-oper-0001-mars-o96-2020-2020-6h-v5 - - **frequency** - - 1h (could be : 6h) + - 1h (could be : 6h, 10m for 10 minutes) - - **version** diff --git a/src/anemoi/registry/commands/update.py b/src/anemoi/registry/commands/update.py index 49abd6e..3380a03 100644 --- a/src/anemoi/registry/commands/update.py +++ b/src/anemoi/registry/commands/update.py @@ -29,6 +29,7 @@ def _shorten(d): class Update: + """Update""" internal = True timestamp = True @@ -82,12 +83,19 @@ def run(self, args): elif args.zarr_file_from_catalogue: method = self.zarr_file_from_catalogue + def _error(message): + LOG.error(message) + if not args.ignore: + raise ValueError(message) + LOG.error("%s", message) + LOG.warning("Continuing with --ignore.") + for path in args.paths: if args.resume and path in done: LOG.info(f"Skipping {path}") continue try: - method(path, args) + method(path, _error=_error, **vars(args)) except Exception as e: if args.continue_: LOG.exception(e) @@ -97,55 +105,64 @@ def run(self, args): with open(args.progress, "a") as f: print(path, file=f) - def _error(self, args, message): - LOG.error(message) - if not args.ignore: - raise ValueError(message) - LOG.error("%s", message) - LOG.warning("Continuing with --ignore.") + def catalogue_from_recipe_file(self, path, _error, workdir, dry_run, force, update, ignore, debug, **kwargs): + return catalogue_from_recipe_file( + path, + workdir=workdir, + dry_run=dry_run, + force=force, + update=update, + ignore=ignore, + debug=debug, + _error=_error, + ) - def catalogue_from_recipe_file(self, path, args): - """Update the catalogue entry a recipe file.""" + def zarr_file_from_catalogue(self, path, _error, dry_run, ignore, **kwargs): + return zarr_file_from_catalogue(path, dry_run=dry_run, ignore=ignore, _error=_error) - from anemoi.datasets import open_dataset - from anemoi.datasets.create import creator_factory - def entry_set_value(path, value): - if args.dry_run: - LOG.info(f"Would set value {path} to {_shorten(value)}") - else: - LOG.info(f"Setting value {path} to {_shorten(value)}") - entry.set_value(path, value) +def catalogue_from_recipe_file(path, *, workdir, dry_run, force, update, ignore, debug, _error=print): + """Update the catalogue entry a recipe file.""" - LOG.info(f"Updating catalogue entry from recipe: {path}") + from anemoi.datasets import open_dataset + from anemoi.datasets.create import creator_factory - with open(path) as f: - recipe = yaml.safe_load(f) + def entry_set_value(path, value): + if dry_run: + LOG.info(f"Would set value {path} to {_shorten(value)}") + else: + LOG.info(f"Setting value {path} to {_shorten(value)}") + entry.set_value(path, value) - if "name" not in recipe: - self._error(args, "Recipe does not contain a 'name' field.") - return + LOG.info(f"Updating catalogue entry from recipe: {path}") - name = recipe["name"] - base, _ = os.path.splitext(os.path.basename(path)) + with open(path) as f: + recipe = yaml.safe_load(f) - if name != base: - self._error(args, f"Recipe name '{name}' does not match file name '{path}'") + if "name" not in recipe: + _error("Recipe does not contain a 'name' field.") + return - try: - entry = Dataset(name, params={"_": True}) - except CatalogueEntryNotFound: - if args.ignore: - LOG.error(f"Entry not found: {name}") - return - raise + name = recipe["name"] + base, _ = os.path.splitext(os.path.basename(path)) - updated = entry.record["metadata"].get("updated", 0) + if name != base: + _error(f"Recipe name '{name}' does not match file name '{path}'") - if "recipe" in entry.record["_original"]["metadata"]: - LOG.info("%s: `recipe` already in original. Use --force and --update to update", name) - if not args.update or not args.force: - return + try: + entry = Dataset(name, params={"_": True}) + except CatalogueEntryNotFound: + if ignore: + LOG.error(f"Entry not found: {name}") + return + raise + + updated = entry.record["metadata"].get("updated", 0) + + if "recipe" in entry.record["_original"]["metadata"]: + LOG.info("%s: `recipe` already in original. Use --force and --update to update", name) + if not update or not force: + return # Remove stuff added by prepml for k in [ @@ -161,9 +178,9 @@ def entry_set_value(path, value): ]: recipe.pop(k, None) - if "recipe" not in entry.record["metadata"] or args.force: + if "recipe" not in entry.record["metadata"] or force: LOG.info("%s, setting `recipe` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) - if args.dry_run: + if dry_run: LOG.info("Would set recipe %s", name) else: LOG.info("Setting recipe %s", name) @@ -175,7 +192,7 @@ def entry_set_value(path, value): constant_fields = entry.record["metadata"].get("constant_fields", []) if computed_constant_fields != constant_fields: LOG.info("%s, setting `constant_fields`", name) - if args.dry_run: + if dry_run: LOG.info("Would set constant_fields %s", name) else: LOG.info("Setting constant_fields %s", name) @@ -188,153 +205,154 @@ def entry_set_value(path, value): constants = entry.record["metadata"]["constant_fields"] variables_metadata = entry.record["metadata"]["variables_metadata"] - changed = False - for k, v in variables_metadata.items(): - - if k in constants and v.get("constant_in_time") is not True: - v["constant_in_time"] = True - changed = True - LOG.info(f"Setting {k} constant_in_time to True") + changed = False + for k, v in variables_metadata.items(): - if "is_constant_in_time" in v: - del v["is_constant_in_time"] - changed = True + if k in constants and v.get("constant_in_time") is not True: + v["constant_in_time"] = True + changed = True + LOG.info(f"Setting {k} constant_in_time to True") - if changed: - if args.debug: - with open(f"{name}.variables_metadata.json", "w") as f: - print(json.dumps(variables_metadata, indent=2), file=f) - entry_set_value("/metadata/variables_metadata", variables_metadata) - entry_set_value("/metadata/updated", updated + 1) - else: - LOG.info("No changes required") + if "is_constant_in_time" in v: + del v["is_constant_in_time"] + changed = True - if "variables_metadata" not in entry.record["metadata"] or args.force: - LOG.info("%s, setting `variables_metadata` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) + if changed: + if debug: + with open(f"{name}.variables_metadata.json", "w") as f: + print(json.dumps(variables_metadata, indent=2), file=f) + entry_set_value("/metadata/variables_metadata", variables_metadata) + entry_set_value("/metadata/updated", updated + 1) + else: + LOG.info("No changes required") - if args.dry_run: - LOG.info("Would set `variables_metadata` %s", name) - else: + if "variables_metadata" not in entry.record["metadata"] or force: + LOG.info("%s, setting `variables_metadata` 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥", name) - dir = os.path.join(args.workdir, f"anemoi-registry-commands-update-{time.time()}") - os.makedirs(dir) + if dry_run: + LOG.info("Would set `variables_metadata` %s", name) + else: - try: - tmp = os.path.join(dir, "tmp.zarr") + dir = os.path.join(workdir, f"anemoi-registry-commands-update-{time.time()}") + os.makedirs(dir) - creator_factory("init", config=path, path=tmp, overwrite=True).run() - - with open(f"{tmp}/.zattrs") as f: - variables_metadata = yaml.safe_load(f)["variables_metadata"] - if args.debug: - with open(f"{name}.variables_metadata.json", "w") as f: - print(json.dumps(variables_metadata, indent=2), file=f) - LOG.info("Setting variables_metadata %s", name) - entry_set_value("/metadata/variables_metadata", variables_metadata) - entry_set_value("/metadata/updated", updated + 1) + try: + tmp = os.path.join(dir, "tmp.zarr") - finally: - shutil.rmtree(dir) + creator_factory("init", config=path, path=tmp, overwrite=True).run() - def zarr_file_from_catalogue(self, path, args): - import zarr + with open(f"{tmp}/.zattrs") as f: + variables_metadata = yaml.safe_load(f)["variables_metadata"] + if debug: + with open(f"{name}.variables_metadata.json", "w") as f: + print(json.dumps(variables_metadata, indent=2), file=f) + LOG.info("Setting variables_metadata %s", name) + entry_set_value("/metadata/variables_metadata", variables_metadata) + entry_set_value("/metadata/updated", updated + 1) - LOG.info(f"Updating zarr file from catalogue: {path}") + finally: + shutil.rmtree(dir) - if not os.path.exists(path): - self._error(args, f"File not found: {path}") - return - z = zarr.open(path) - metadata = z.attrs.asdict() +def zarr_file_from_catalogue(path, *, dry_run, ignore, _error=print): + import zarr - if "uuid" not in metadata: - self._error(args, "Zarr metadata does not have a 'uuid' field.") + LOG.info(f"Updating zarr file from catalogue: {path}") - match = None - for e in DatasetCatalogueEntryList().get(params={"uuid": metadata["uuid"]}): - if match: - self._error(args, f"Multiple entries found for uuid {metadata['uuid']}") - match = e + if not os.path.exists(path) and not path.startswith("s3://"): + _error(f"File not found: {path}") + return - if match is None: - self._error(args, f"No entry found for uuid {metadata['uuid']}") + z = zarr.open(path) + metadata = z.attrs.asdict() - name = match["name"] - base, _ = os.path.splitext(os.path.basename(path)) + if "uuid" not in metadata: + _error("Zarr metadata does not have a 'uuid' field.") - if name != base: - self._error(args, f"Metadata name '{name}' does not match file name '{path}'") + match = None + for e in DatasetCatalogueEntryList().get(params={"uuid": metadata["uuid"]}): + if match: + _error(f"Multiple entries found for uuid {metadata['uuid']}") + match = e - try: - entry = Dataset(name) - except CatalogueEntryNotFound: - if args.force: - LOG.error(f"Entry not found: {name}") - return - raise + if match is None: + _error(f"No entry found for uuid {metadata['uuid']}") - def dict_are_different(d1, d2, path=""): + name = match["name"] + base, _ = os.path.splitext(os.path.basename(path)) - def _(d): - return textwrap.shorten(json.dumps(d, ensure_ascii=False), width=80, placeholder="...") + if name != base: + _error(f"Metadata name '{name}' does not match file name '{path}'") - diff = False + try: + entry = Dataset(name) + except CatalogueEntryNotFound: + if ignore: + LOG.error(f"Entry not found: {name}") + return + raise - if d1 == d2: - return False + def dict_are_different(d1, d2, path=""): - if type(d1) is not type(d2): - print(f"Type mismatch at {path}: {type(d1)} != {type(d2)}") - return True + def _(d): + return textwrap.shorten(json.dumps(d, ensure_ascii=False), width=80, placeholder="...") - if isinstance(d1, dict) and isinstance(d2, dict): - for k in d1.keys(): - if k not in d2: - print(f"Key {path + '.' + k} is missing in the local dictionary {_(d1[k])}") - diff = True + diff = False - if k in d1 and k in d2 and dict_are_different(d1[k], d2[k], path + "." + k): - diff = True + if d1 == d2: + return False - for k in d2.keys(): - if k not in d1: - print(f"Key {path + '.' + k} is missing in the remote dictionary {_(d2[k])}") - diff = True + if type(d1) is not type(d2): + print(f"Type mismatch at {path}: {type(d1)} != {type(d2)}") + return True - return diff + if isinstance(d1, dict) and isinstance(d2, dict): + for k in d1.keys(): + if k not in d2: + print(f"Key {path + '.' + k} is missing in the local dictionary {_(d1[k])}") + diff = True - if isinstance(d1, list) and isinstance(d2, list): - if len(d1) != len(d2): - print(f"List length mismatch at {path}: {len(d1)} != {len(d2)}") - return True + if k in d1 and k in d2 and dict_are_different(d1[k], d2[k], path + "." + k): + diff = True - for i, (a, b) in enumerate(zip(d1, d2)): - if dict_are_different(a, b, path + f"[{i}]"): - diff = True + for k in d2.keys(): + if k not in d1: + print(f"Key {path + '.' + k} is missing in the remote dictionary {_(d2[k])}") + diff = True - return diff + return diff - if d1 != d2: - print(f"Value differs at {path}: {d1} != {d2}") + if isinstance(d1, list) and isinstance(d2, list): + if len(d1) != len(d2): + print(f"List length mismatch at {path}: {len(d1)} != {len(d2)}") return True + for i, (a, b) in enumerate(zip(d1, d2)): + if dict_are_different(a, b, path + f"[{i}]"): + diff = True + return diff - # Example usage - entry_metadata = entry.record["metadata"] - diff = dict_are_different(entry_metadata, metadata) + if d1 != d2: + print(f"Value differs at {path}: {d1} != {d2}") + return True - if not diff: - LOG.info(f"Metadata is up to date: {name}") - return + return diff - if args.dry_run: - return + # Example usage + entry_metadata = entry.record["metadata"] + diff = dict_are_different(entry_metadata, metadata) + + if not diff: + LOG.info(f"Metadata is up to date: {name}") + return + + if dry_run: + return - z = zarr.open(path, mode="a") - LOG.info(f"Updating metadata: {name}") - z.attrs.update(entry_metadata) + z = zarr.open(path, mode="a") + LOG.info(f"Updating metadata: {name}") + z.attrs.update(entry_metadata) command = Update diff --git a/src/anemoi/registry/commands/worker.py b/src/anemoi/registry/commands/worker.py index 0cf8aa5..cc256e9 100644 --- a/src/anemoi/registry/commands/worker.py +++ b/src/anemoi/registry/commands/worker.py @@ -45,6 +45,10 @@ def add_arguments(self, command_parser): transfer.add_argument("--threads", help="Number of threads to use", type=int) transfer.add_argument("--filter-tasks", help="Filter tasks to process (key=value list)", nargs="*", default=[]) + update = subparsers.add_parser("update-dataset", help="Update datasets from the catalogue") + update.add_argument("--destination", help="Platform destination (e.g. leonardo, lumi, marenostrum)") + update.add_argument("--directory", help="The directory where the datasets are located.") + delete = subparsers.add_parser("delete-dataset", help="Delete dataset") delete.add_argument("--platform", help="Platform destination (e.g. leonardo, lumi, marenostrum)") delete.add_argument("--filter-tasks", help="Filter tasks to process (key=value list)", nargs="*", default=[]) @@ -52,7 +56,7 @@ def add_arguments(self, command_parser): dummy = subparsers.add_parser("dummy", help="Dummy worker for test purposes") dummy.add_argument("--arg") - for subparser in [transfer, delete, dummy]: + for subparser in [transfer, delete, dummy, update]: subparser.add_argument("--timeout", help="Die with timeout (SIGALARM) after TIMEOUT seconds.", type=int) subparser.add_argument("--wait", help="Check for new task every WAIT seconds.", type=int) subparser.add_argument("--heartbeat", help="Heartbeat interval", type=int) diff --git a/src/anemoi/registry/workers/__init__.py b/src/anemoi/registry/workers/__init__.py index 5bf6cf7..6c829b2 100644 --- a/src/anemoi/registry/workers/__init__.py +++ b/src/anemoi/registry/workers/__init__.py @@ -17,6 +17,7 @@ from anemoi.utils.humanize import when from anemoi.registry import config +from anemoi.registry.tasks import TaskCatalogueEntry from anemoi.registry.tasks import TaskCatalogueEntryList # from anemoi.utils.provenance import trace_info @@ -131,7 +132,8 @@ def send_heartbeat(): thread.join() @classmethod - def parse_task(cls, task, *keys): + def parse_task(cls, task: TaskCatalogueEntry, *keys: list[str]): + """Parse a task (from the catalogue) and return a list of values for the given keys.""" data = task.record.copy() assert isinstance(data, dict), data @@ -208,6 +210,7 @@ def run_worker(action, **kwargs): from .delete_dataset import DeleteDatasetWorker from .transfer_dataset import TransferDatasetWorker + from .update_dataset import UpdateDatasetWorker workers_config = config().get("workers", {}) worker_config = workers_config.get(action, {}) @@ -231,6 +234,7 @@ def run_worker(action, **kwargs): cls = { "transfer-dataset": TransferDatasetWorker, "delete-dataset": DeleteDatasetWorker, + "update-dataset": UpdateDatasetWorker, "dummy": DummyWorker, }[action] cls(**kwargs).run() diff --git a/src/anemoi/registry/workers/update_dataset.py b/src/anemoi/registry/workers/update_dataset.py new file mode 100644 index 0000000..4746272 --- /dev/null +++ b/src/anemoi/registry/workers/update_dataset.py @@ -0,0 +1,94 @@ +# (C) Copyright 2024 Anemoi contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + + +import logging +import os + +from anemoi.registry.entry.dataset import DatasetCatalogueEntry + +from . import Worker + +LOG = logging.getLogger(__name__) + + +class UpdateDatasetWorker(Worker): + """Patch datsets from the catalogue""" + + name = "update-dataset" + + def __init__( + self, + destination, + directory=None, + filter_tasks={}, + **kwargs, + ): + super().__init__(**kwargs) + + self.destination = destination + self.directory = directory + if not self.destination: + raise ValueError("No destination platform specified") + + if not self.directory: + # in this case should get the directory from the catalogue + raise NotImplementedError("No directory specified") + + if self.directory: + if not os.path.exists(self.directory): + raise ValueError(f"Directory {self.directory} does not exist") + + self.filter_tasks.update(filter_tasks) + self.filter_tasks["destination"] = self.destination + + def worker_process_task(self, task): + from anemoi.registry.commands.update import zarr_file_from_catalogue + + (destination,) = self.parse_task(task) + assert destination == self.destination, (destination, self.destination) + + for path in os.listdir(self.directory): + path = os.path.join(self.directory, path) + if not path.endswith(".zarr"): + continue + name = os.path.basename(path)[:-5] + + LOG.info(f"Updating dataset '{name}' from catalogue on '{destination}'") + + def check_path(name, path): + entry = DatasetCatalogueEntry(key=name) + locations = entry.record["locations"] + if destination not in locations: + raise ValueError( + f"Platform '{destination}' not registerd as a location in the catalogue. Not updating." + ) + catalogue_path = locations[self.destination]["path"] + if os.path.realpath(path) != os.path.realpath(catalogue_path): + raise ValueError(f"Path '{path}' does not match catalogue path {catalogue_path}. Not updating.") + + try: + check_path(name, path) + zarr_file_from_catalogue(path, dry_run=False, ignore=False, _error=print) + except Exception as e: + LOG.error(f"Error updating {path}: {e}") + continue + + @classmethod + def parse_task(cls, task): + assert task.record["action"] == "update-dataset", task.record["action"] + + destination = super().parse_task(task, "destination") + + if "/" in destination: + raise ValueError(f"Destination '{destination}' must not contain '/', this is a platform name") + if "." in destination: + raise ValueError(f"Destination '{destination}' must not contain '.', this is a platform name") + + return destination diff --git a/tests/dummy-recipe-dataset.yaml b/tests/recipe.yaml similarity index 100% rename from tests/dummy-recipe-dataset.yaml rename to tests/recipe.yaml diff --git a/tests/test_all.py b/tests/test_all.py old mode 100755 new mode 100644 index 22ff7ba..ace762c --- a/tests/test_all.py +++ b/tests/test_all.py @@ -8,8 +8,12 @@ # nor does it submit to any jurisdiction. import os +import shutil import subprocess +import uuid +import yaml +import zarr from anemoi.utils.remote import transfer DATASET = "aifs-ea-an-oper-0001-mars-20p0-1979-1979-6h-v0-testing" @@ -19,6 +23,7 @@ TMP_DATASET = f"{DATASET}-{pid}" TMP_DATASET_PATH = f"{TMP_DATASET}.zarr" +TMP_RECIPE = f"./{TMP_DATASET}.yaml" DATASET_URL = "s3://ml-tests/test-data/anemoi-datasets/create/pipe.zarr/" @@ -26,61 +31,108 @@ def run(*args): print(" ".join(args)) try: - subprocess.check_call(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + result.check_returncode() except Exception as e: + print("----------------SDTOUT----------------") + print(result.stdout) + print("----------------SDERR----------------") + print(result.stderr) + print("-------------------------------------") e.add_note = f"Command failed: {' '.join(args)}" raise -def setup_module(): - teardown_module(raise_if_error=False) +def setup_experiments(): run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml", "--register") run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml") + +def setup_checkpoints(): run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt", "--register") run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt") + +def setup_datasets(): + # cache to avoid downloading the dataset when re-running the tests if not os.path.exists(DATASET_PATH): transfer(DATASET_URL, DATASET_PATH, overwrite=True) - import uuid + assert os.path.exists(DATASET_PATH) - import zarr + # create a temporary recipe file with the right name in + with open("./recipe.yaml", "r") as f: + r = yaml.load(f, Loader=yaml.FullLoader) + r["name"] = TMP_DATASET + with open(TMP_RECIPE, "w") as f: + yaml.dump(r, f) - z = zarr.open(DATASET_PATH) - z.attrs["uuid"] = str(uuid.uuid4()) - assert os.path.exists(DATASET_PATH) + # set new uuid to the tmp dataset + shutil.copytree(DATASET_PATH, TMP_DATASET_PATH) + z = zarr.open(TMP_DATASET_PATH) + z.attrs["uuid"] = str(uuid.uuid4()) - os.symlink(DATASET_PATH, TMP_DATASET_PATH) + # register the dataset run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--register") + + # check that the dataset is registered run("anemoi-registry", "datasets", TMP_DATASET_PATH) + print("# Setup done") -def teardown_module(raise_if_error=True): - error = None +def _setup_module(): + _teardown_module(raise_if_error=False) + setup_experiments() + setup_checkpoints() + setup_datasets() + + +def teardown_experiments(errors): try: - run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe", "--unregister") + run("anemoi-registry", "experiments", "./dummp-recipe-experiment.yaml", "--unregister") except Exception as e: - error = e + errors.append(e) + +def teardown_checkpoints(errors): try: - run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml", "--unregister") + run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe", "--unregister") except Exception as e: - error = e + errors.append(e) + +def teardown_datasets(errors): try: run("anemoi-registry", "datasets", TMP_DATASET, "--unregister") - os.remove(TMP_DATASET_PATH) except Exception as e: - error = e - if error and raise_if_error: - raise error + errors.append(e) + + try: + os.remove(TMP_RECIPE) + except Exception as e: + errors.append(e) + + try: + shutil.rmtree(TMP_DATASET_PATH) + except Exception as e: + errors.append(e) + + +def _teardown_module(raise_if_error=True): + errors = [] + teardown_experiments(errors) + teardown_checkpoints(errors) + teardown_datasets(errors) + if errors and raise_if_error: + for e in errors: + print(e) + raise e -def test_datasets(): +def _test_datasets(): # assert run("anemoi-registry", "datasets", TMP_DATASET) == 1 run("anemoi-registry", "datasets", TMP_DATASET) - run("anemoi-registry", "datasets", TMP_DATASET, "--set-recipe", "./dummy-recipe-dataset.yaml") + run("anemoi-registry", "datasets", TMP_DATASET, "--set-recipe", TMP_RECIPE) run("anemoi-registry", "datasets", TMP_DATASET, "--set-status", "testing") run( "anemoi-registry", @@ -102,11 +154,14 @@ def test_datasets(): ) run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "ewc") - # do not upload the dataset to avoid polluting the s3 bucket, until we have a way to clean it up automatically - # run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--add-location", "ewc", "--upload") + # This is poluting the s3 bucket, we should have a way to clean it up automatically + run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--add-location", "ewc", "--upload") + run("anemoi-registry", "update", "--catalogue-from-recipe-file", TMP_RECIPE, "--force", "--update") + run("anemoi-registry", "update", "--zarr-file-from-catalogue", TMP_DATASET_PATH, "--force") -def test_weights(): + +def _test_weights(): # assert run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe") == 1 run("anemoi-registry", "weights", "a5275e04-0000-0000-a0f6-be19591b09fe") run( @@ -120,32 +175,57 @@ def test_weights(): ) -def test_experiments(): +def _test_experiments(): run("anemoi-registry", "experiments", "i4df") run("anemoi-registry", "experiments", "i4df", "--add-plots", "./dummy-quaver.pdf") run("anemoi-registry", "experiments", "i4df", "--add-weights", "./dummy-checkpoint.ckpt") -def test_list_commands(): +def _test_list_commands(): run("anemoi-registry", "list", "experiments") run("anemoi-registry", "list", "weights") run("anemoi-registry", "list", "datasets") +def test_print(): + print("test") + + if __name__ == "__main__": - test_list_commands() + _test_list_commands() print() + errors = [] + print("# Start setup") - setup_module() + setup_datasets() try: - print() - test_datasets() - print() - test_weights() - print() - test_experiments() - print() + _test_datasets() finally: print("# Start teardown") - teardown_module() + teardown_datasets(errors) + + print() + + print("# Start setup") + setup_experiments() + try: + _test_experiments() + finally: + print("# Start teardown") + teardown_experiments(errors) + + print() + + print("# Start setup") + setup_checkpoints() + try: + _test_weights() + finally: + print("# Start teardown") + teardown_checkpoints(errors) + + if errors: + for e in errors: + print(e) + raise e