From 4c03f5b8b429d2b040caa5d0a74f2dc5991db8ff Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 16:29:11 -0400 Subject: [PATCH 01/10] update recipe with injection + new output path --- .github/workflows/deploy_recipe.yaml | 2 +- feedstock/recipe.py | 53 +++++++++++++++++++++++++++- global_config.json | 3 +- scripts/update_attrs.py | 7 ++-- 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/.github/workflows/deploy_recipe.yaml b/.github/workflows/deploy_recipe.yaml index b358a54..00f2105 100644 --- a/.github/workflows/deploy_recipe.yaml +++ b/.github/workflows/deploy_recipe.yaml @@ -50,7 +50,7 @@ jobs: break fi echo "Waiting for Dataflow jobs to finish..." - sleep 30 + sleep 20 done update-attrs: needs: deploy-recipes diff --git a/feedstock/recipe.py b/feedstock/recipe.py index c0ed32f..6bc15f7 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -3,9 +3,11 @@ """ import zarr import json +import pathlib import os from dataclasses import dataclass import apache_beam as beam +from datetime import datetime, timezone from pangeo_forge_recipes.patterns import pattern_from_file_sequence from pangeo_forge_recipes.transforms import ( OpenURLWithFSSpec, @@ -14,6 +16,8 @@ ConsolidateMetadata, ConsolidateDimensionCoordinates, ) +from ruamel.yaml import YAML +yaml = YAML(typ='safe') # copied from cmip feedstock (TODO: move to central repo?) @dataclass @@ -31,7 +35,7 @@ def _copy(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore: # gcs = gcsio.GcsIO() # gcs.copytree(source, target) print(f"HERE: Copying {source} to {target}") - fs = gcsfs.GCSFileSystem() + fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this? fs.cp(source, target, recursive=True) # return a new store with the new path that behaves exactly like the input # to this stage (so we can slot this stage right before testing/logging stages) @@ -41,6 +45,23 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: return (pcoll | "Copying Store" >> beam.Map(self._copy) ) + +@dataclass +class InjectAttrs(beam.PTransform): + inject_attrs: dict + + def _update_zarr_attrs(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore: + #TODO: Can we get a warning here if the store does not exist? + store = zarr.open(store, mode='a') + store.attrs.update(self.inject_attrs) + #? Should we consolidate here? We are explicitly doing that later... + return store + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return (pcoll + | "Injecting Attributes" >> beam.Map(self._update_zarr_attrs) + ) +# TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes? # Common Parameters dataset_url = 'https://zenodo.org/record/7761881/files' @@ -48,6 +69,34 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: global_config = json.load(f) latest_store_prefix = global_config['latest_store_prefix'] +# Set up injection attributes +# This is for demonstration purposes only and should be discussed with the broader LEAP/PGF community +# - Bake in information from the top level of the meta.yaml +# - Add a timestamp +# - Add the git hash +# - Add link to the meta.yaml on main +# - Add the recipe id + +# read info from meta.yaml +meta_path = './feedstock/meta.yaml' +meta = yaml.load(pathlib.Path(meta_path)) +meta_yaml_url_main = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/blob/main/feedstock/meta.yaml" +git_url_hash = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/commit/{os.environ['GITHUB_SHA']}" +timestamp = datetime.now(timezone.utc).isoformat() + +#TODO: Can we make some of this a standard part of the injection stage? The user would only define stuff that should be overwritten. +injection_attrs = { + f"pangeo-forge-{k}": meta.get(k, 'none') for k in [ + 'description', + 'provenance', + 'maintainers', + ] +} +injection_attrs['latest_data_updated_git_hash'] = git_url_hash +injection_attrs['latest_data_updated_timestamp'] = timestamp +injection_attrs['ref_meta.yaml'] = meta_yaml_url_main + + ## Monthly version input_urls_a = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2001, 2003)] input_urls_b = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2003, 2005)] @@ -66,6 +115,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: # Maybe its better to find another way and avoid injections entirely... combine_dims=pattern_a.combine_dim_keys, ) + |InjectAttrs(injection_attrs) |ConsolidateDimensionCoordinates() |ConsolidateMetadata() |Copy(target_prefix=latest_store_prefix) @@ -79,6 +129,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: store_name='proto-b.zarr', combine_dims=pattern_b.combine_dim_keys, ) + |InjectAttrs(injection_attrs) |ConsolidateDimensionCoordinates() |ConsolidateMetadata() |Copy(target_prefix=latest_store_prefix) diff --git a/global_config.json b/global_config.json index 7dbea57..e63457c 100644 --- a/global_config.json +++ b/global_config.json @@ -1,3 +1,4 @@ { - "latest_store_prefix":"gs://leap-scratch/jbusecke/proto_feedstock/latest" + "latest_data_store_prefix":"gs://leap-scratch/jbusecke/proto_feedstock/latest/data", + "latest_catalog_store_prefix":"gs://leap-scratch/jbusecke/proto_feedstock/latest/catalog" } \ No newline at end of file diff --git a/scripts/update_attrs.py b/scripts/update_attrs.py index 097a3c1..cb43d7d 100644 --- a/scripts/update_attrs.py +++ b/scripts/update_attrs.py @@ -11,9 +11,10 @@ with open('global_config.json') as f: global_config = json.load(f) -latest_store_prefix = global_config['latest_store_prefix'] -print(f"Using {latest_store_prefix=}") +latest_data_store_prefix = global_config['latest_data_store_prefix'] + +print(f"Using {latest_data_store_prefix=}") fs = gcsfs.GCSFileSystem() @@ -44,7 +45,7 @@ def update_zarr_attrs(store_path:str, additonal_attrs:dict): assert 'object' in recipe.keys() #(FIXME: this does not support dict objects yet...) # Some how get the store path from the recipe - store_path = f'{latest_store_prefix}/{id}.zarr' + store_path = f'{latest_data_store_prefix}/{id}.zarr' # This only works if the store name in the recipe is the same as the id...which is easy to get wrong!!! # add the infor from the top level of the meta.yaml From 7a374a0dfa16937ab2e3b1f2d4cd58affff80531 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 16:31:15 -0400 Subject: [PATCH 02/10] remove attr update from recipe deployment --- .github/workflows/deploy_recipe.yaml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/deploy_recipe.yaml b/.github/workflows/deploy_recipe.yaml index 00f2105..e6736c6 100644 --- a/.github/workflows/deploy_recipe.yaml +++ b/.github/workflows/deploy_recipe.yaml @@ -52,8 +52,9 @@ jobs: echo "Waiting for Dataflow jobs to finish..." sleep 20 done - update-attrs: - needs: deploy-recipes - uses: ./.github/workflows/update_attrs.yaml - secrets: - GCP_DATAFLOW_SERVICE_KEY: ${{ secrets.GCP_DATAFLOW_SERVICE_KEY }} \ No newline at end of file + follow-up-task: + runs-on: ubuntu-latest + steps: + - name: just a dummy for now + run: | + echo "Here we would run the catalog zarr recipe in line" \ No newline at end of file From 1d77dd819584576ca19138d26373d1f9034bf0a0 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 16:34:53 -0400 Subject: [PATCH 03/10] fix dependency in workflow and path renaming --- .github/workflows/deploy_recipe.yaml | 1 + feedstock/recipe.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/deploy_recipe.yaml b/.github/workflows/deploy_recipe.yaml index e6736c6..181a1e2 100644 --- a/.github/workflows/deploy_recipe.yaml +++ b/.github/workflows/deploy_recipe.yaml @@ -53,6 +53,7 @@ jobs: sleep 20 done follow-up-task: + needs: deploy-recipes runs-on: ubuntu-latest steps: - name: just a dummy for now diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 6bc15f7..9625bc5 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -67,7 +67,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: dataset_url = 'https://zenodo.org/record/7761881/files' with open('global_config.json') as f: global_config = json.load(f) -latest_store_prefix = global_config['latest_store_prefix'] +latest_data_store_prefix = global_config['latest_data_store_prefix'] # Set up injection attributes # This is for demonstration purposes only and should be discussed with the broader LEAP/PGF community @@ -118,7 +118,7 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: |InjectAttrs(injection_attrs) |ConsolidateDimensionCoordinates() |ConsolidateMetadata() - |Copy(target_prefix=latest_store_prefix) + |Copy(target_prefix=latest_data_store_prefix) ) proto_b = ( @@ -132,5 +132,5 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: |InjectAttrs(injection_attrs) |ConsolidateDimensionCoordinates() |ConsolidateMetadata() - |Copy(target_prefix=latest_store_prefix) + |Copy(target_prefix=latest_data_store_prefix) ) From e12fc63fc37f871d605b3b0a89bbdca2bb1fd2d4 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 16:38:44 -0400 Subject: [PATCH 04/10] restore meta.yaml --- feedstock/meta.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/feedstock/meta.yaml b/feedstock/meta.yaml index 1396f67..5a90f7e 100644 --- a/feedstock/meta.yaml +++ b/feedstock/meta.yaml @@ -7,8 +7,6 @@ recipes: - id: "proto-b" object: "recipe:proto_b" - pyramid_store: 'some-store-somewhere' - pyramid_store_public: 'false' provenance: providers: From fc772aebc6f0ceee8b8d7d1946a869001ae9656d Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 16:54:34 -0400 Subject: [PATCH 05/10] Try wrapping the output of StoreToZarr in zarr.storage.FSStore --- feedstock/recipe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 9625bc5..1680dfb 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -52,6 +52,8 @@ class InjectAttrs(beam.PTransform): def _update_zarr_attrs(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore: #TODO: Can we get a warning here if the store does not exist? + # assert isinstance(store, zarr.storage.FSStore) + store = zarr.storage.FSStore(store) store = zarr.open(store, mode='a') store.attrs.update(self.inject_attrs) #? Should we consolidate here? We are explicitly doing that later... From a1027c927f342d597a1e778c6a6e2826fd569c15 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 16:57:07 -0400 Subject: [PATCH 06/10] Get an actual error out of this. --- feedstock/recipe.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 1680dfb..1837e7d 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -52,8 +52,7 @@ class InjectAttrs(beam.PTransform): def _update_zarr_attrs(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore: #TODO: Can we get a warning here if the store does not exist? - # assert isinstance(store, zarr.storage.FSStore) - store = zarr.storage.FSStore(store) + assert isinstance(store, zarr.storage.FSStore) store = zarr.open(store, mode='a') store.attrs.update(self.inject_attrs) #? Should we consolidate here? We are explicitly doing that later... From d5a28655f43bbbeb766b1323786d0f8da8fc4f34 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 17:10:35 -0400 Subject: [PATCH 07/10] oh oops i was replacing the input in line --- feedstock/recipe.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 1837e7d..1c98d94 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -52,13 +52,12 @@ class InjectAttrs(beam.PTransform): def _update_zarr_attrs(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore: #TODO: Can we get a warning here if the store does not exist? - assert isinstance(store, zarr.storage.FSStore) - store = zarr.open(store, mode='a') - store.attrs.update(self.inject_attrs) + attrs = zarr.open(store, mode='a').attrs + attrs.update(self.inject_attrs) #? Should we consolidate here? We are explicitly doing that later... return store - def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollection[zarr.storage.FSStore]: return (pcoll | "Injecting Attributes" >> beam.Map(self._update_zarr_attrs) ) @@ -97,6 +96,9 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: injection_attrs['latest_data_updated_timestamp'] = timestamp injection_attrs['ref_meta.yaml'] = meta_yaml_url_main +# FIXME: The above is not working, lets try with something simpler +injection_attrs = {'test':'test'} + ## Monthly version input_urls_a = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2001, 2003)] From c2065458df65255c5f54d48a12189728b398d2c7 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 17:14:48 -0400 Subject: [PATCH 08/10] Make the recipes a lot smaller --- feedstock/recipe.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 1c98d94..c35f1bb 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -1,5 +1,5 @@ """ -A prototype based on MetaFlux +A synthetic prototype recipe """ import zarr import json @@ -64,7 +64,6 @@ def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollect # TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes? # Common Parameters -dataset_url = 'https://zenodo.org/record/7761881/files' with open('global_config.json') as f: global_config = json.load(f) latest_data_store_prefix = global_config['latest_data_store_prefix'] @@ -101,12 +100,19 @@ def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollect ## Monthly version -input_urls_a = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2001, 2003)] -input_urls_b = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2003, 2005)] +input_urls_a = [ + "gs://cmip6/pgf-debugging/hanging_bug/file_a.nc", + "gs://cmip6/pgf-debugging/hanging_bug/file_b.nc", +] +input_urls_b = [ + "gs://cmip6/pgf-debugging/hanging_bug/file_a_huge.nc", + "gs://cmip6/pgf-debugging/hanging_bug/file_b_huge.nc", +] pattern_a = pattern_from_file_sequence(input_urls_a, concat_dim='time') pattern_b = pattern_from_file_sequence(input_urls_b, concat_dim='time') +# very small recipe proto_a = ( beam.Create(pattern_a.items()) | OpenURLWithFSSpec() @@ -124,6 +130,7 @@ def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollect |Copy(target_prefix=latest_data_store_prefix) ) +# larger recipe proto_b = ( beam.Create(pattern_b.items()) | OpenURLWithFSSpec() From 732a455f4515caffb33e3ca46526a7a8384c0346 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 17:26:15 -0400 Subject: [PATCH 09/10] Try again with complex injections --- feedstock/recipe.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/feedstock/recipe.py b/feedstock/recipe.py index c35f1bb..3eecad5 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -95,10 +95,6 @@ def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollect injection_attrs['latest_data_updated_timestamp'] = timestamp injection_attrs['ref_meta.yaml'] = meta_yaml_url_main -# FIXME: The above is not working, lets try with something simpler -injection_attrs = {'test':'test'} - - ## Monthly version input_urls_a = [ "gs://cmip6/pgf-debugging/hanging_bug/file_a.nc", From 0db120cfa5bf469e458e48a99315897d7f42dd13 Mon Sep 17 00:00:00 2001 From: Julius Busecke Date: Tue, 9 Apr 2024 17:34:02 -0400 Subject: [PATCH 10/10] Change the ids, something is weird --- feedstock/meta.yaml | 17 +++++------------ feedstock/recipe.py | 8 ++++---- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/feedstock/meta.yaml b/feedstock/meta.yaml index 5a90f7e..312b63b 100644 --- a/feedstock/meta.yaml +++ b/feedstock/meta.yaml @@ -2,21 +2,14 @@ title: "LEAP Data Library Prototype" description: > A prototype test for the LEAP Data Library refactor recipes: - - id: "proto-a" + - id: "large" object: "recipe:proto_a" - - - id: "proto-b" - object: "recipe:proto_b" - + - id: "large" + object: "recipe:large" provenance: providers: - - name: "Zenodo" - description: "Zenodo" - roles: - - host - url: https://zenodo.org/record/7761881#.ZFv9OS-B30p - - name: "Nathaniel et al." - description: "Authors of MetaFlux: Meta-learning global carbon fluxes from sparse spatiotemporal observations" + - name: "Julius" + description: "Just a guy testing some recipes. Nothing to see here." roles: - producer - licensor diff --git a/feedstock/recipe.py b/feedstock/recipe.py index 3eecad5..3d47ef1 100644 --- a/feedstock/recipe.py +++ b/feedstock/recipe.py @@ -109,12 +109,12 @@ def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollect pattern_b = pattern_from_file_sequence(input_urls_b, concat_dim='time') # very small recipe -proto_a = ( +small = ( beam.Create(pattern_a.items()) | OpenURLWithFSSpec() | OpenWithXarray() | StoreToZarr( - store_name='proto-a.zarr', + store_name='small.zarr', #FIXME: This is brittle. it needs to be named exactly like in meta.yaml... # Can we inject this in the same way as the root? # Maybe its better to find another way and avoid injections entirely... @@ -127,12 +127,12 @@ def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollect ) # larger recipe -proto_b = ( +large = ( beam.Create(pattern_b.items()) | OpenURLWithFSSpec() | OpenWithXarray() | StoreToZarr( - store_name='proto-b.zarr', + store_name='large.zarr', combine_dims=pattern_b.combine_dim_keys, ) |InjectAttrs(injection_attrs)