Skip to content

Commit

Permalink
Merge pull request #12 from leap-stc/recipe_attrs_injection
Browse files Browse the repository at this point in the history
Add attr injection for the data zarr store
  • Loading branch information
jbusecke authored Apr 9, 2024
2 parents 9c38934 + 0db120c commit 7f9366b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 35 deletions.
12 changes: 7 additions & 5 deletions .github/workflows/deploy_recipe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ jobs:
break
fi
echo "Waiting for Dataflow jobs to finish..."
sleep 30
sleep 20
done
update-attrs:
follow-up-task:
needs: deploy-recipes
uses: ./.github/workflows/update_attrs.yaml
secrets:
GCP_DATAFLOW_SERVICE_KEY: ${{ secrets.GCP_DATAFLOW_SERVICE_KEY }}
runs-on: ubuntu-latest
steps:
- name: just a dummy for now
run: |
echo "Here we would run the catalog zarr recipe in line"
19 changes: 5 additions & 14 deletions feedstock/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,14 @@ title: "LEAP Data Library Prototype"
description: >
A prototype test for the LEAP Data Library refactor
recipes:
- id: "proto-a"
- id: "large"
object: "recipe:proto_a"

- id: "proto-b"
object: "recipe:proto_b"
pyramid_store: 'some-store-somewhere'
pyramid_store_public: 'false'

- id: "large"
object: "recipe:large"
provenance:
providers:
- name: "Zenodo"
description: "Zenodo"
roles:
- host
url: https://zenodo.org/record/7761881#.ZFv9OS-B30p
- name: "Nathaniel et al."
description: "Authors of MetaFlux: Meta-learning global carbon fluxes from sparse spatiotemporal observations"
- name: "Julius"
description: "Just a guy testing some recipes. Nothing to see here."
roles:
- producer
- licensor
Expand Down
81 changes: 69 additions & 12 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""
A prototype based on MetaFlux
A synthetic prototype recipe
"""
import zarr
import json
import pathlib
import os
from dataclasses import dataclass
import apache_beam as beam
from datetime import datetime, timezone
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
Expand All @@ -14,6 +16,8 @@
ConsolidateMetadata,
ConsolidateDimensionCoordinates,
)
from ruamel.yaml import YAML
yaml = YAML(typ='safe')

# copied from cmip feedstock (TODO: move to central repo?)
@dataclass
Expand All @@ -31,7 +35,7 @@ def _copy(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# gcs = gcsio.GcsIO()
# gcs.copytree(source, target)
print(f"HERE: Copying {source} to {target}")
fs = gcsfs.GCSFileSystem()
fs = gcsfs.GCSFileSystem() # FIXME: How can we generalize this?
fs.cp(source, target, recursive=True)
# return a new store with the new path that behaves exactly like the input
# to this stage (so we can slot this stage right before testing/logging stages)
Expand All @@ -41,45 +45,98 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return (pcoll
| "Copying Store" >> beam.Map(self._copy)
)

@dataclass
class InjectAttrs(beam.PTransform):
inject_attrs: dict

def _update_zarr_attrs(self,store: zarr.storage.FSStore) -> zarr.storage.FSStore:
#TODO: Can we get a warning here if the store does not exist?
attrs = zarr.open(store, mode='a').attrs
attrs.update(self.inject_attrs)
#? Should we consolidate here? We are explicitly doing that later...
return store

def expand(self, pcoll: beam.PCollection[zarr.storage.FSStore]) -> beam.PCollection[zarr.storage.FSStore]:
return (pcoll
| "Injecting Attributes" >> beam.Map(self._update_zarr_attrs)
)
# TODO: Both these stages are generally useful. They should at least be in the utils package, maybe in recipes?

# Common Parameters
dataset_url = 'https://zenodo.org/record/7761881/files'
with open('global_config.json') as f:
global_config = json.load(f)
latest_store_prefix = global_config['latest_store_prefix']
latest_data_store_prefix = global_config['latest_data_store_prefix']

# Set up injection attributes
# This is for demonstration purposes only and should be discussed with the broader LEAP/PGF community
# - Bake in information from the top level of the meta.yaml
# - Add a timestamp
# - Add the git hash
# - Add link to the meta.yaml on main
# - Add the recipe id

# read info from meta.yaml
meta_path = './feedstock/meta.yaml'
meta = yaml.load(pathlib.Path(meta_path))
meta_yaml_url_main = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/blob/main/feedstock/meta.yaml"
git_url_hash = f"{os.environ['GITHUB_SERVER_URL']}/{os.environ['GITHUB_REPOSITORY']}/commit/{os.environ['GITHUB_SHA']}"
timestamp = datetime.now(timezone.utc).isoformat()

#TODO: Can we make some of this a standard part of the injection stage? The user would only define stuff that should be overwritten.
injection_attrs = {
f"pangeo-forge-{k}": meta.get(k, 'none') for k in [
'description',
'provenance',
'maintainers',
]
}
injection_attrs['latest_data_updated_git_hash'] = git_url_hash
injection_attrs['latest_data_updated_timestamp'] = timestamp
injection_attrs['ref_meta.yaml'] = meta_yaml_url_main

## Monthly version
input_urls_a = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2001, 2003)]
input_urls_b = [f'{dataset_url}/METAFLUX_GPP_RECO_monthly_{y}.nc' for y in range(2003, 2005)]
input_urls_a = [
"gs://cmip6/pgf-debugging/hanging_bug/file_a.nc",
"gs://cmip6/pgf-debugging/hanging_bug/file_b.nc",
]
input_urls_b = [
"gs://cmip6/pgf-debugging/hanging_bug/file_a_huge.nc",
"gs://cmip6/pgf-debugging/hanging_bug/file_b_huge.nc",
]

pattern_a = pattern_from_file_sequence(input_urls_a, concat_dim='time')
pattern_b = pattern_from_file_sequence(input_urls_b, concat_dim='time')

proto_a = (
# very small recipe
small = (
beam.Create(pattern_a.items())
| OpenURLWithFSSpec()
| OpenWithXarray()
| StoreToZarr(
store_name='proto-a.zarr',
store_name='small.zarr',
#FIXME: This is brittle. it needs to be named exactly like in meta.yaml...
# Can we inject this in the same way as the root?
# Maybe its better to find another way and avoid injections entirely...
combine_dims=pattern_a.combine_dim_keys,
)
|InjectAttrs(injection_attrs)
|ConsolidateDimensionCoordinates()
|ConsolidateMetadata()
|Copy(target_prefix=latest_store_prefix)
|Copy(target_prefix=latest_data_store_prefix)
)

proto_b = (
# larger recipe
large = (
beam.Create(pattern_b.items())
| OpenURLWithFSSpec()
| OpenWithXarray()
| StoreToZarr(
store_name='proto-b.zarr',
store_name='large.zarr',
combine_dims=pattern_b.combine_dim_keys,
)
|InjectAttrs(injection_attrs)
|ConsolidateDimensionCoordinates()
|ConsolidateMetadata()
|Copy(target_prefix=latest_store_prefix)
|Copy(target_prefix=latest_data_store_prefix)
)
3 changes: 2 additions & 1 deletion global_config.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"latest_store_prefix":"gs://leap-scratch/jbusecke/proto_feedstock/latest"
"latest_data_store_prefix":"gs://leap-scratch/jbusecke/proto_feedstock/latest/data",
"latest_catalog_store_prefix":"gs://leap-scratch/jbusecke/proto_feedstock/latest/catalog"
}
7 changes: 4 additions & 3 deletions scripts/update_attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

with open('global_config.json') as f:
global_config = json.load(f)
latest_store_prefix = global_config['latest_store_prefix']

print(f"Using {latest_store_prefix=}")
latest_data_store_prefix = global_config['latest_data_store_prefix']

print(f"Using {latest_data_store_prefix=}")

fs = gcsfs.GCSFileSystem()

Expand Down Expand Up @@ -44,7 +45,7 @@ def update_zarr_attrs(store_path:str, additonal_attrs:dict):
assert 'object' in recipe.keys() #(FIXME: this does not support dict objects yet...)

# Some how get the store path from the recipe
store_path = f'{latest_store_prefix}/{id}.zarr'
store_path = f'{latest_data_store_prefix}/{id}.zarr'
# This only works if the store name in the recipe is the same as the id...which is easy to get wrong!!!

# add the infor from the top level of the meta.yaml
Expand Down

0 comments on commit 7f9366b

Please sign in to comment.