Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zarr-python v3 compatibility #516

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
8 changes: 4 additions & 4 deletions kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def append(
ds = xr.open_dataset(
fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}
)
z = zarr.open(fs.get_mapper())
z = zarr.open(fs.get_mapper(), zarr_format=2)
mzz = MultiZarrToZarr(
path,
out=fs.references, # dict or parquet/lazy
Expand Down Expand Up @@ -360,7 +360,7 @@ def first_pass(self):
fs._dircache_from_items()

logger.debug("First pass: %s", i)
z = zarr.open_group(fs.get_mapper(""))
z = zarr.open_group(fs.get_mapper(""), zarr_format=2)
for var in self.concat_dims:
value = self._get_value(i, z, var, fn=self._paths[i])
if isinstance(value, np.ndarray):
Expand All @@ -387,7 +387,7 @@ def store_coords(self):
"""
kv = {}
store = zarr.storage.KVStore(kv)
group = zarr.open(store)
group = zarr.open(store, zarr_format=2)
m = self.fss[0].get_mapper("")
z = zarr.open(m)
for k, v in self.coos.items():
Expand Down Expand Up @@ -461,7 +461,7 @@ def second_pass(self):
for i, fs in enumerate(self.fss):
to_download = {}
m = fs.get_mapper("")
z = zarr.open(m)
z = zarr.open(m, zarr_format=2)

if no_deps is None:
# done first time only
Expand Down
7 changes: 4 additions & 3 deletions kerchunk/fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fsspec.implementations.reference import LazyReferenceMapper


from kerchunk.utils import class_factory
from kerchunk.utils import class_factory, dict_to_store
from kerchunk.codecs import AsciiTableCodec, VarArrCodec

try:
Expand Down Expand Up @@ -72,7 +72,8 @@ def process_file(

storage_options = storage_options or {}
out = out or {}
g = zarr.open(out)
store = dict_to_store(out)
g = zarr.open_group(store=store, zarr_format=2)

with fsspec.open(url, mode="rb", **storage_options) as f:
infile = fits.open(f, do_not_scale_image_data=True)
Expand Down Expand Up @@ -164,7 +165,7 @@ def process_file(
# TODO: we could sub-chunk on biggest dimension
name = hdu.name or str(ext)
arr = g.empty(
name, dtype=dtype, shape=shape, chunks=shape, compression=None, **kwargs
name=name, dtype=dtype, shape=shape, chunks=shape, compressor=None, zarr_format=2, **kwargs
)
arr.attrs.update(
{
Expand Down
4 changes: 2 additions & 2 deletions kerchunk/grib2.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def scan_grib(
if good is False:
continue

z = zarr.open_group(store)
z = zarr.open_group(store, zarr_format=2)
global_attrs = {
f"GRIB_{k}": m[k]
for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS
Expand Down Expand Up @@ -398,7 +398,7 @@ def grib_tree(

# TODO allow passing a LazyReferenceMapper as output?
zarr_store = {}
zroot = zarr.open_group(store=zarr_store)
zroot = zarr.open_group(store=zarr_store, zarr_format=2)

aggregations: Dict[str, List] = defaultdict(list)
aggregation_dims: Dict[str, Set] = defaultdict(set)
Expand Down
44 changes: 23 additions & 21 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numcodecs

from .codecs import FillStringsCodec
from .utils import _encode_for_JSON
from .utils import _encode_for_JSON, encode_fill_value, dict_to_store, translate_refs_serializable

try:
import h5py
Expand All @@ -21,12 +21,6 @@
"for more details."
)

try:
from zarr.meta import encode_fill_value
except ModuleNotFoundError:
# https://github.com/zarr-developers/zarr-python/issues/2021
from zarr.v2.meta import encode_fill_value

lggr = logging.getLogger("h5-to-zarr")
_HIDDEN_ATTRS = { # from h5netcdf.attrs
"REFERENCE_LIST",
Expand Down Expand Up @@ -111,9 +105,9 @@ def __init__(
if vlen_encode not in ["embed", "null", "leave", "encode"]:
raise NotImplementedError
self.vlen = vlen_encode
self.store = out or {}
self._zroot = zarr.group(store=self.store, overwrite=True)

self.store_dict = out or {}
self.store = dict_to_store(self.store_dict)
self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True)
self._uri = url
self.error = error
lggr.debug(f"HDF5 file URI: {self._uri}")
Expand All @@ -140,7 +134,6 @@ def translate(self, preserve_linked_dsets=False):
"""
lggr.debug("Translation begins")
self._transfer_attrs(self._h5f, self._zroot)

self._h5f.visititems(self._translator)

if preserve_linked_dsets:
Expand All @@ -157,7 +150,8 @@ def translate(self, preserve_linked_dsets=False):
self.store.flush()
return self.store
else:
store = _encode_for_JSON(self.store)
translate_refs_serializable(self.store_dict)
store = _encode_for_JSON(self.store_dict)
return {"version": 1, "refs": store}

def _unref(self, ref):
Expand Down Expand Up @@ -465,26 +459,30 @@ def _translator(
if h5py.h5ds.is_scale(h5obj.id) and not cinfo:
return
if h5obj.attrs.get("_FillValue") is not None:
fill = h5obj.attrs.get("_FillValue")
fill = encode_fill_value(
h5obj.attrs.get("_FillValue"), dt or h5obj.dtype
)

# Create a Zarr array equivalent to this HDF5 dataset...
za = self._zroot.require_dataset(
h5obj.name,
adims = self._get_array_dims(h5obj)

# Create a Zarr array equivalent to this HDF5 dataset..
za = self._zroot.require_array(
name=h5obj.name,
shape=h5obj.shape,
dtype=dt or h5obj.dtype,
chunks=h5obj.chunks or False,
fill_value=fill,
compression=None,
compressor=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, you could reintroduce the compressor

filters = filters[:-1]
compressor = filters[-1]

but obviously it depends on whether there are indeed any filters at all.

It would still need back compat, since filters-only datasts definitely exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the big issue is that v3 cares about what type of operation it is, and v2w doesnt so moving them around doesnt necessarily fix that bug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there needs to be a change upstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filters=filters,
overwrite=True,
attributes={
"_ARRAY_DIMENSIONS": adims,
},
**kwargs,
)
lggr.debug(f"Created Zarr array: {za}")
self._transfer_attrs(h5obj, za)
adims = self._get_array_dims(h5obj)
za.attrs["_ARRAY_DIMENSIONS"] = adims

lggr.debug(f"_ARRAY_DIMENSIONS = {adims}")

if "data" in kwargs:
Expand All @@ -496,6 +494,8 @@ def _translator(
if h5obj.fletcher32:
logging.info("Discarding fletcher32 checksum")
v["size"] -= 4
key = str.removeprefix(h5obj.name, "/") + "/" + ".".join(map(str, k))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as what _chunk_key did? Maybe make it a function with a comment saying it's a copy/reimplementation.

By the way, is h5obj.name not actually a string, so you could have done h5obj.name.removeprefix()?


if (
self.inline
and isinstance(v, dict)
Expand All @@ -508,9 +508,10 @@ def _translator(
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
self.store[za._chunk_key(k)] = data

self.store_dict[key] = data
else:
self.store[za._chunk_key(k)] = [
self.store_dict[key] = [
self._uri,
v["offset"],
v["size"],
Expand Down Expand Up @@ -681,3 +682,4 @@ def _is_netcdf_variable(dataset: h5py.Dataset):

def has_visititems_links():
return hasattr(h5py.Group, "visititems_links")

2 changes: 1 addition & 1 deletion kerchunk/hdf4.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def translate(self, filename=None, storage_options=None):
remote_protocol=prot,
remote_options=self.st,
)
g = zarr.open_group("reference://", storage_options=dict(fs=fs))
g = zarr.open_group("reference://", storage_options=dict(fs=fs), zarr_format=2)
refs = {}
for k, v in output.items():
if isinstance(v, dict):
Expand Down
16 changes: 10 additions & 6 deletions kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from functools import reduce
from packaging.version import Version
from operator import mul

import numpy as np
from fsspec.implementations.reference import LazyReferenceMapper
import fsspec

from kerchunk.utils import _encode_for_JSON, inline_array
from kerchunk.utils import _encode_for_JSON, dict_to_store, inline_array, translate_refs_serializable

try:
from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable
Expand Down Expand Up @@ -167,7 +168,9 @@ def translate(self):
import zarr

out = self.out
z = zarr.open(out, mode="w")
store = dict_to_store(out)
z = zarr.open(store, mode="w", zarr_format=2, overwrite=True)

for dim, var in self.variables.items():
if dim in self.chunks:
shape = self.chunks[dim][-1]
Expand All @@ -191,13 +194,13 @@ def translate(self):
fill = float(fill)
if fill is not None and var.data.dtype.kind == "i":
fill = int(fill)
arr = z.create_dataset(
arr = z.create_array(
name=dim,
shape=shape,
dtype=var.data.dtype,
fill_value=fill,
chunks=shape,
compression=None,
compressor=None,
)
part = ".".join(["0"] * len(shape)) or "0"
k = f"{dim}/{part}"
Expand Down Expand Up @@ -245,13 +248,13 @@ def translate(self):
fill = float(fill)
if fill is not None and base.kind == "i":
fill = int(fill)
arr = z.create_dataset(
arr = z.create_array(
name=name,
shape=shape,
dtype=base,
fill_value=fill,
chunks=(1,) + dtype.shape,
compression=None,
compressor=None,
)
arr.attrs.update(
{
Expand Down Expand Up @@ -295,6 +298,7 @@ def translate(self):
out.flush()
return out
else:
translate_refs_serializable(out)
out = _encode_for_JSON(out)
return {"version": 1, "refs": out}

Expand Down
6 changes: 3 additions & 3 deletions kerchunk/tests/test_combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@

# simple time arrays - xarray can't make these!
m = fs.get_mapper("time1.zarr")
z = zarr.open(m, mode="w")
z = zarr.open(m, mode="w", zarr_format=2)
ar = z.create_dataset("time", data=np.array([1], dtype="M8[s]"))
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_dataset("data", data=arr)
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]})

m = fs.get_mapper("time2.zarr")
z = zarr.open(m, mode="w")
z = zarr.open(m, mode="w", zarr_format=2)
ar = z.create_dataset("time", data=np.array([2], dtype="M8[s]"))
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_dataset("data", data=arr)
Expand Down Expand Up @@ -272,7 +272,7 @@ def test_get_coos(refs, selector, expected):
mzz.first_pass()
assert mzz.coos["time"].tolist() == expected
mzz.store_coords()
g = zarr.open(mzz.out)
g = zarr.open(mzz.out, zarr_format=2)
assert g["time"][:].tolist() == expected
assert dict(g.attrs)

Expand Down
20 changes: 10 additions & 10 deletions kerchunk/tests/test_combine_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
refs = []
for i, x in enumerate(arrays):
fn = f"{tmpdir}/out{i}.zarr"
g = zarr.open(fn)
g = zarr.open(fn, zarr_format=2)
g.create_dataset("x", data=x, chunks=chunks)
fns.append(fn)
ref = kerchunk.zarr.single_zarr(fn, inline=0)
Expand All @@ -62,7 +62,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
)

mapper = fsspec.get_mapper("reference://", fo=out)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()

try:
Expand All @@ -76,7 +76,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
remote_protocol="file",
skip_instance_cache=True,
)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()

kerchunk.df.refs_to_dataframe(out, "memory://out.parq", record_size=1)
Expand All @@ -86,7 +86,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
remote_protocol="file",
skip_instance_cache=True,
)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()


Expand All @@ -95,9 +95,9 @@ def test_fail_chunks(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(10)
x2 = np.arange(10, 20)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(2,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(3,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand All @@ -112,9 +112,9 @@ def test_fail_shape(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(12).reshape(6, 2)
x2 = np.arange(12, 24)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(2,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(2,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand All @@ -129,9 +129,9 @@ def test_fail_irregular_chunk_boundaries(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(10)
x2 = np.arange(10, 24)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(4,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(4,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand Down
Loading
Loading