Skip to content

Commit

Permalink
Merge pull request #6 from ecmwf/feature/xarray
Browse files Browse the repository at this point in the history
Feature/xarray
  • Loading branch information
b8raoult authored Jul 25, 2024
2 parents 46e2a24 + b4afe2e commit 7765a5d
Show file tree
Hide file tree
Showing 52 changed files with 2,716 additions and 1,012 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ repos:
rev: v0.4.6
hooks:
- id: ruff
# Next line if for documenation cod snippets
exclude: '^[^_].*_\.py$'
# Next line is to exclude for documentation code snippets
exclude: 'docs/(.*/)?[a-z]\w+_.py$'
args:
- --line-length=120
- --fix
Expand Down
1 change: 1 addition & 0 deletions docs/building/sources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The following `sources` are currently available:
sources/mars
sources/grib
sources/netcdf
sources/xarray
sources/opendap
sources/forcings
sources/accumulations
Expand Down
6 changes: 6 additions & 0 deletions docs/building/sources/xarray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
########
xarray
########

.. literalinclude:: xarray.yaml
:language: yaml
3 changes: 3 additions & 0 deletions docs/building/sources/xarray.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
input:
xarray:
url: https://...
18 changes: 15 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,24 @@ dynamic = [
"version",
]
dependencies = [
"anemoi-utils[provenance]>=0.3.5",
"anemoi-utils[provenance]>=0.3.11",
"numpy",
"pyyaml",
"semantic-version",
"tqdm",
"zarr<=2.17",

"zarr",
]

optional-dependencies.all = [
"aiohttp",
"boto3",
"earthkit-data[mars]>=0.9",
"earthkit-geo>=0.2",
"earthkit-meteo",
"ecmwflibs>=0.6.3",
"entrypoints",
"gcsfs",
"kerchunk",
"pyproj",
"requests",
"s3fs",
Expand All @@ -81,12 +83,15 @@ optional-dependencies.create = [
]

optional-dependencies.dev = [
"aiohttp",
"boto3",
"earthkit-data[mars]>=0.9",
"earthkit-geo>=0.2",
"earthkit-meteo",
"ecmwflibs>=0.6.3",
"entrypoints",
"gcsfs",
"kerchunk",
"nbsphinx",
"pandoc",
"pyproj",
Expand All @@ -108,7 +113,14 @@ optional-dependencies.docs = [
"sphinx-rtd-theme",
]

optional-dependencies.kerchunk = [
"gcsfs",
"kerchunk",
"s3fs",
]

optional-dependencies.remote = [
"aiohttp",
"boto3",
"requests",
"s3fs",
Expand Down
59 changes: 59 additions & 0 deletions src/anemoi/datasets/commands/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
# nor does it submit to any jurisdiction.
#

import numpy as np
import tqdm
import zarr

from anemoi.datasets import open_dataset

from . import Command
Expand All @@ -19,6 +23,8 @@ class Compare(Command):
def add_arguments(self, command_parser):
command_parser.add_argument("dataset1")
command_parser.add_argument("dataset2")
command_parser.add_argument("--data", action="store_true", help="Compare the data.")
command_parser.add_argument("--statistics", action="store_true", help="Compare the statistics.")

def run(self, args):
ds1 = open_dataset(args.dataset1)
Expand All @@ -42,5 +48,58 @@ def run(self, args):
f"{ds2.statistics['mean'][ds2.name_to_index[v]]:14g}",
)

if args.data:
print()
print("Data:")
print("-----")
print()

diff = 0
for a, b in tqdm.tqdm(zip(ds1, ds2)):
if not np.array_equal(a, b, equal_nan=True):
diff += 1

print(f"Number of different rows: {diff}/{len(ds1)}")

if args.data:
print()
print("Data 2:")
print("-----")
print()

ds1 = zarr.open(args.dataset1, mode="r")
ds2 = zarr.open(args.dataset2, mode="r")

for name in (
"data",
"count",
"sums",
"squares",
"mean",
"stdev",
"minimum",
"maximum",
"latitudes",
"longitudes",
):
a1 = ds1[name]
a2 = ds2[name]

if len(a1) != len(a2):
print(f"{name}: lengths mismatch {len(a1)} != {len(a2)}")
continue

diff = 0
for a, b in tqdm.tqdm(zip(a1, a2), leave=False):
if not np.array_equal(a, b, equal_nan=True):
if diff == 0:
print(f"\n{name}: first different row:")
print(a[a != b])
print(b[a != b])

diff += 1

print(f"{name}: {diff} different rows out of {len(a1)}")


command = Compare
87 changes: 84 additions & 3 deletions src/anemoi/datasets/commands/create.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,39 @@
from anemoi.datasets.create import Creator
import datetime
import logging
import time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

import tqdm
from anemoi.utils.humanize import seconds_to_human

from anemoi.datasets.create.trace import enable_trace

from . import Command

LOG = logging.getLogger(__name__)


def task(what, options, *args, **kwargs):
"""
Make sure `import Creator` is done in the sub-processes, and not in the main one.
"""

now = datetime.datetime.now()
LOG.debug(f"Task {what}({args},{kwargs}) starting")

from anemoi.datasets.create import Creator

if "trace" in options:
enable_trace(options["trace"])

c = Creator(**options)
result = getattr(c, what)(*args, **kwargs)

LOG.debug(f"Task {what}({args},{kwargs}) completed ({datetime.datetime.now()-now})")
return result


class Create(Command):
"""Create a dataset."""
Expand All @@ -22,12 +54,61 @@ def add_arguments(self, command_parser):
)
command_parser.add_argument("config", help="Configuration yaml file defining the recipe to create the dataset.")
command_parser.add_argument("path", help="Path to store the created data.")
group = command_parser.add_mutually_exclusive_group()
group.add_argument("--threads", help="Use `n` parallel thread workers.", type=int, default=0)
group.add_argument("--processes", help="Use `n` parallel process workers.", type=int, default=0)
command_parser.add_argument("--trace", action="store_true")

def run(self, args):
kwargs = vars(args)
now = time.time()
if args.threads + args.processes:
self.parallel_create(args)
else:
self.serial_create(args)
LOG.info(f"Create completed in {seconds_to_human(time.time()-now)}")

c = Creator(**kwargs)
def serial_create(self, args):
from anemoi.datasets.create import Creator

options = vars(args)
c = Creator(**options)
c.create()

def parallel_create(self, args):
"""Some modules, like fsspec do not work well with fork()
Other modules may not be thread safe. So we implement
parallel loadining using multiprocessing before any
of the modules are imported.
"""

options = vars(args)
parallel = args.threads + args.processes
args.use_threads = args.threads > 0

if args.use_threads:
ExecutorClass = ThreadPoolExecutor
else:
ExecutorClass = ProcessPoolExecutor

with ExecutorClass(max_workers=1) as executor:
total = executor.submit(task, "init", options).result()

futures = []

with ExecutorClass(max_workers=parallel) as executor:
for n in range(total):
futures.append(executor.submit(task, "load", options, parts=f"{n+1}/{total}"))

for future in tqdm.tqdm(
as_completed(futures), desc="Loading", total=len(futures), colour="green", position=parallel + 1
):
future.result()

with ExecutorClass(max_workers=1) as executor:
executor.submit(task, "statistics", options).result()
executor.submit(task, "additions", options).result()
executor.submit(task, "cleanup", options).result()
executor.submit(task, "verify", options).result()


command = Create
2 changes: 1 addition & 1 deletion src/anemoi/datasets/commands/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def print_sizes(self, size):
if total_size is not None:
print(f"💽 Size : {bytes(total_size)} ({bytes_to_human(total_size)})")
if n is not None:
print(f"📁 Files : {n}")
print(f"📁 Files : {n:,}")

@property
def statistics(self):
Expand Down
Loading

0 comments on commit 7765a5d

Please sign in to comment.