diff --git a/hdc/algo/_version.py b/hdc/algo/_version.py index f735d11..495f545 100644 --- a/hdc/algo/_version.py +++ b/hdc/algo/_version.py @@ -1,3 +1,3 @@ """Version only in this file.""" -__version__ = "0.4.0" +__version__ = "0.5.0" diff --git a/hdc/algo/accessors.py b/hdc/algo/accessors.py index 2f8cc3b..d79006e 100644 --- a/hdc/algo/accessors.py +++ b/hdc/algo/accessors.py @@ -7,11 +7,11 @@ import dask.array as da from dask.base import tokenize import numpy as np -import pandas as pd import xarray from . import ops from .dekad import Dekad +from .utils import get_calibration_indices, to_linspace __all__ = [ "Anomalies", @@ -463,20 +463,19 @@ class PixelAlgorithms(AccessorBase): def spi( self, - calibration_start: Optional[str] = None, - calibration_stop: Optional[str] = None, + calibration_begin: Optional[str] = None, + calibration_end: Optional[str] = None, nodata: Optional[Union[float, int]] = None, - groups: Optional[Iterable[int]] = None, + groups: Optional[Iterable[Union[int, float, str]]] = None, dtype="int16", ): """Calculate the SPI along the time dimension. Calculates the Standardized Precipitation Index along the time dimension. - Optionally, a calibration start and / or stop date can be provided which + Optionally, a calibration begin and / or end date can be provided which determine the part of the timeseries used to fit the gamma distribution. - `groups` can be supplied as list of group labels (attention, they are required - to be in format {0..n-1} where n is the number of unique groups. + `groups` can be supplied as list of group labels. If `groups` is supplied, the SPI will be computed for each individual group. This is intended to be used when SPI should be calculated for specific timesteps. """ @@ -497,33 +496,31 @@ def spi( tix = self._obj.get_index("time") - calstart_ix = 0 - if calibration_start is not None: - calstart = pd.Timestamp(calibration_start) - if calstart > tix[-1]: - raise ValueError( - "Calibration start cannot be greater than last timestamp!" - ) - (calstart_ix,) = tix.get_indexer([calstart], method="bfill") + if calibration_begin is None: + calibration_begin = tix[0] - calstop_ix = tix.size - if calibration_stop is not None: - calstop = pd.Timestamp(calibration_stop) - if calstop < tix[0]: - raise ValueError( - "Calibration stop cannot be smaller than first timestamp!" - ) - (calstop_ix,) = tix.get_indexer([calstop], method="ffill") + 1 + if calibration_end is None: + calibration_end = tix[-1] - if calstart_ix >= calstop_ix: - raise ValueError("calibration_start < calibration_stop!") + if calibration_begin > tix[-1:]: + raise ValueError("Calibration begin cannot be greater than last timestamp!") - if abs(calstop_ix - calstart_ix) <= 1: - raise ValueError( - "Timeseries too short for calculating SPI. Please adjust calibration period!" - ) + if calibration_end < tix[:1]: + raise ValueError("Calibration end cannot be smaller than first timestamp!") if groups is None: + calstart_ix, calstop_ix = get_calibration_indices( + tix, (calibration_begin, calibration_end) + ) + + if calstart_ix >= calstop_ix: + raise ValueError("calibration_begin < calibration_end!") + + if abs(calstop_ix - calstart_ix) <= 1: + raise ValueError( + "Timeseries too short for calculating SPI. Please adjust calibration period!" + ) + res = xarray.apply_ufunc( gammastd_yxt, self._obj, @@ -540,12 +537,28 @@ def spi( ) else: - groups = np.array(groups) if not isinstance(groups, np.ndarray) else groups - num_groups = np.unique(groups).size - if not groups.dtype.name == "int16": - warn("Casting groups to int16!") - groups = groups.astype("int16") + groups, keys = to_linspace(np.array(groups, dtype="str")) + + if len(groups) != len(self._obj.time): + raise ValueError("Need array of groups same length as time dimension!") + + groups = groups.astype("int16") + num_groups = len(keys) + + cal_indices = get_calibration_indices( + tix, (calibration_begin, calibration_end), groups, num_groups + ) + # assert for mypy + assert isinstance(cal_indices, np.ndarray) + + if np.any(cal_indices[:, 0] >= cal_indices[:, 1]): + raise ValueError("calibration_begin < calibration_end!") + + if np.any(np.diff(cal_indices, axis=1) <= 1): + raise ValueError( + "Timeseries too short for calculating SPI. Please adjust calibration period!" + ) res = xarray.apply_ufunc( gammastd_grp, @@ -553,9 +566,8 @@ def spi( groups, num_groups, nodata, - calstart_ix, - calstop_ix, - input_core_dims=[["time"], ["grps"], [], [], [], []], + cal_indices, + input_core_dims=[["time"], ["grps"], [], [], ["start", "stop"]], output_core_dims=[["time"]], keep_attrs=True, dask="parallelized", @@ -564,8 +576,8 @@ def spi( res.attrs.update( { - "spi_calibration_start": str(tix[calstart_ix].date()), - "spi_calibration_stop": str(tix[calstop_ix - 1].date()), + "spi_calibration_begin": str(tix[tix >= calibration_begin][0]), + "spi_calibration_end": str(tix[tix <= calibration_end][-1]), } ) @@ -806,7 +818,7 @@ def mean( # set null values to nodata value xx = xx.where(xx.notnull(), xx.nodata) - + attrs = xx.attrs num_zones = len(zone_ids) dims = (xx.dims[0], dim_name, "stat") coords = { @@ -849,7 +861,7 @@ def mean( ) return xarray.DataArray( - data=data, dims=dims, coords=coords, attrs={}, name=name + data=data, dims=dims, coords=coords, attrs=attrs, name=name ) diff --git a/hdc/algo/ops/stats.py b/hdc/algo/ops/stats.py index 37a0140..95547b7 100644 --- a/hdc/algo/ops/stats.py +++ b/hdc/algo/ops/stats.py @@ -241,21 +241,28 @@ def gammastd_yxt( @lazycompile( guvectorize( [ - "(int16[:], int16[:], float64, float64, float64, float64, int16[:])", - "(float32[:], int16[:], float64, float64, float64, float64, int16[:])", + "(int16[:], int16[:], float64, float64, int16[:, :], int16[:])", + "(float32[:], int16[:], float64, float64, int16[:, :], int16[:])", ], - "(n),(m),(),(),(),() -> (n)", + "(n),(m),(),(),(o, p) -> (n)", ) ) -def gammastd_grp(xx, groups, num_groups, nodata, cal_start, cal_stop, yy): +def gammastd_grp(xx, groups, num_groups, nodata, cal_indices, yy): """Calculate the gammastd for specific groups. This calculates gammastd across xx for indivual groups defined in `groups`. These need to be in ascending order from 0 to num_groups - 1. + + `cal_indices` is an array of shape (num_groups, 2) where each row + contains the start and end index for the calibration period for each group. """ for grp in range(num_groups): grp_ix = groups == grp + + cal_start = cal_indices[grp, 0] + cal_stop = cal_indices[grp, 1] + pix = xx[grp_ix] if (pix != nodata).sum() == 0: yy[grp_ix] = nodata diff --git a/hdc/algo/utils.py b/hdc/algo/utils.py index 3b65cc2..7fad5cd 100644 --- a/hdc/algo/utils.py +++ b/hdc/algo/utils.py @@ -1,11 +1,15 @@ """hcd-algo utility functions.""" -from typing import List, Tuple +from typing import Iterable, List, Optional, Tuple, Union import numpy as np +from numpy.typing import NDArray +import pandas as pd +DateType = Union[str, pd.Timestamp, np.datetime64] -def to_linspace(x) -> Tuple[np.ndarray, List[int]]: + +def to_linspace(x) -> Tuple[NDArray[(np.int16,)], List[int]]: """Map input array to linear space. Returns array with linear index (0 - n-1) and list of @@ -21,3 +25,47 @@ def to_linspace(x) -> Tuple[np.ndarray, List[int]]: new_pix = np.where(mask, values[idx], 0) return new_pix, list(keys) + + +def get_calibration_indices( + time: pd.DatetimeIndex, + calibration_range: Tuple[DateType, DateType], + groups: Optional[Iterable[Union[int, float, str]]] = None, + num_groups: Optional[int] = None, +) -> Union[Tuple[int, int], np.ndarray]: + """ + Get the calibration indices for a given time range. + + This function returns indices for a calibration period (e.g. used for SPI) + given an index of timestamps and a start & stop date. + If groups are provided, the indices are returned per group, as an + array of shape (num_groups, 2) where the first column is the start index and + the second column is the stop index. + + Parameters: + time: The time index. + start: The start time of the calibration range. + stop: The stop time of the calibration range. + groups: Optional groups to consider for calibration. + num_groups: Optional number of groups to consider for calibration. + """ + begin, end = calibration_range + + def _get_ix(x: NDArray[(np.datetime64,)], v: DateType, side: str): + return x.searchsorted(np.datetime64(v), side) # type: ignore + + if groups is not None: + if num_groups is None: + num_groups = len(np.unique(np.array(groups))) + return np.array( + [ + [ + _get_ix(time[groups == ix].values, begin, "left"), + _get_ix(time[groups == ix].values, end, "right"), + ] + for ix in range(num_groups) + ], + dtype="int16", + ) + + return _get_ix(time.values, begin, "left"), _get_ix(time.values, end, "right") diff --git a/tests/test_accessors.py b/tests/test_accessors.py index 1c331d7..c2717f9 100644 --- a/tests/test_accessors.py +++ b/tests/test_accessors.py @@ -205,6 +205,12 @@ def test_algo_spi_grouped(darr, res_spi): np.testing.assert_array_equal(_res, res_spi) +def test_algo_spi_grouped_2(darr, res_spi): + _res = darr.astype("float32").hdc.algo.spi(groups=["D1", "D1", "D1", "D1", "D1"]) + assert isinstance(_res, xr.DataArray) + np.testing.assert_array_equal(_res, res_spi) + + def test_algo_spi_transp(darr, res_spi): _darr = darr.transpose(..., "time") _res = _darr.hdc.algo.spi() @@ -214,30 +220,30 @@ def test_algo_spi_transp(darr, res_spi): def test_algo_spi_attrs_default(darr): _res = darr.hdc.algo.spi() - assert _res.attrs["spi_calibration_start"] == str(darr.time.dt.date[0].values) - assert _res.attrs["spi_calibration_stop"] == str(darr.time.dt.date[-1].values) + assert _res.attrs["spi_calibration_begin"] == str(darr.time.to_index()[0]) + assert _res.attrs["spi_calibration_end"] == str(darr.time.to_index()[-1]) def test_algo_spi_attrs_start(darr): - _res = darr.hdc.algo.spi(calibration_start="2000-01-02") - assert _res.attrs["spi_calibration_start"] == "2000-01-11" + _res = darr.hdc.algo.spi(calibration_begin="2000-01-02") + assert _res.attrs["spi_calibration_begin"] == "2000-01-11 00:00:00" def test_algo_spi_attrs_stop(darr): - _res = darr.hdc.algo.spi(calibration_stop="2000-02-09") - assert _res.attrs["spi_calibration_stop"] == "2000-01-31" + _res = darr.hdc.algo.spi(calibration_end="2000-02-09") + assert _res.attrs["spi_calibration_end"] == "2000-01-31 00:00:00" def test_algo_spi_decoupled_1(darr, res_spi): _res = darr.hdc.algo.spi( - calibration_start="2000-01-01", calibration_stop="2000-02-10" + calibration_begin="2000-01-01", calibration_end="2000-02-10" ) assert isinstance(_res, xr.DataArray) np.testing.assert_array_equal(_res, res_spi) - assert _res.attrs["spi_calibration_start"] == "2000-01-01" - assert _res.attrs["spi_calibration_stop"] == "2000-02-10" + assert _res.attrs["spi_calibration_begin"] == "2000-01-01 00:00:00" + assert _res.attrs["spi_calibration_end"] == "2000-02-10 00:00:00" def test_algo_spi_decoupled_2(darr): @@ -249,14 +255,14 @@ def test_algo_spi_decoupled_2(darr): ) _res = darr.hdc.algo.spi( - calibration_start="2000-01-01", calibration_stop="2000-01-31" + calibration_begin="2000-01-01", calibration_end="2000-01-31" ) assert isinstance(_res, xr.DataArray) np.testing.assert_array_equal(_res, res_spi) - assert _res.attrs["spi_calibration_start"] == "2000-01-01" - assert _res.attrs["spi_calibration_stop"] == "2000-01-31" + assert _res.attrs["spi_calibration_begin"] == "2000-01-01 00:00:00" + assert _res.attrs["spi_calibration_end"] == "2000-01-31 00:00:00" def test_algo_spi_decoupled_3(darr): @@ -267,13 +273,13 @@ def test_algo_spi_decoupled_3(darr): ] ) - _res = darr.hdc.algo.spi(calibration_start="2000-01-11") + _res = darr.hdc.algo.spi(calibration_begin="2000-01-11") assert isinstance(_res, xr.DataArray) np.testing.assert_array_equal(_res, res_spi) - assert _res.attrs["spi_calibration_start"] == "2000-01-11" - assert _res.attrs["spi_calibration_stop"] == "2000-02-10" + assert _res.attrs["spi_calibration_begin"] == "2000-01-11 00:00:00" + assert _res.attrs["spi_calibration_end"] == str(darr.time.to_index()[-1]) def test_algo_spi_nodata(darr): @@ -286,30 +292,30 @@ def test_algo_spi_nodata(darr): def test_algo_spi_decoupled_err_1(darr): with pytest.raises(ValueError): _res = darr.hdc.algo.spi( - calibration_start="2000-03-01", + calibration_begin="2000-03-01", ) def test_algo_spi_decoupled_err_2(darr): with pytest.raises(ValueError): _res = darr.hdc.algo.spi( - calibration_stop="1999-01-01", + calibration_end="1999-01-01", ) def test_algo_spi_decoupled_err_3(darr): with pytest.raises(ValueError): _res = darr.hdc.algo.spi( - calibration_start="2000-01-01", - calibration_stop="2000-01-01", + calibration_begin="2000-01-01", + calibration_end="2000-01-01", ) def test_algo_spi_decoupled_err_4(darr): with pytest.raises(ValueError): _res = darr.hdc.algo.spi( - calibration_start="2000-02-01", - calibration_stop="2000-01-01", + calibration_begin="2000-02-01", + calibration_end="2000-01-01", ) @@ -965,6 +971,7 @@ def test_zonal_mean(darr, zones): assert list(x.coords["stat"].values) == ["mean", "valid"] np.testing.assert_almost_equal(x, res) assert x.dtype == res.dtype + assert x.nodata == darr.nodata def test_zonal_mean_nodata(darr, zones): @@ -985,6 +992,7 @@ def test_zonal_mean_nodata(darr, zones): x = darr.hdc.zonal.mean(zones, z_ids) np.testing.assert_almost_equal(x, res) assert x.dtype == res.dtype + assert x.nodata == darr.nodata def test_zonal_mean_nodata_nan(darr, zones): @@ -994,6 +1002,7 @@ def test_zonal_mean_nodata_nan(darr, zones): x = darr.hdc.zonal.mean(zones, z_ids) assert np.isnan(x.data[[0, -1], :, 0]).all() assert np.all(x.data[[0, -1], :, 1] == 0) + assert x.nodata == darr.nodata def test_zonal_mean_nodata_nan_float(darr, zones): @@ -1015,6 +1024,7 @@ def test_zonal_mean_nodata_nan_float(darr, zones): x = darr.hdc.zonal.mean(zones, z_ids, dtype="float64") np.testing.assert_almost_equal(x, res) assert x.dtype == res.dtype + assert x.nodata == darr.nodata def test_zonal_zone_nodata_nan(darr, zones): @@ -1034,6 +1044,7 @@ def test_zonal_zone_nodata_nan(darr, zones): x = darr.hdc.zonal.mean(zones, z_ids, dim_name="foo", dtype="float64") np.testing.assert_almost_equal(x, res) assert x.dtype == res.dtype + assert x.nodata == darr.nodata def test_zonal_dimname(darr, zones): diff --git a/tests/test_algos.py b/tests/test_algos.py index 9aeb6a4..3aaadcf 100644 --- a/tests/test_algos.py +++ b/tests/test_algos.py @@ -169,7 +169,8 @@ def test_gammastd_selfit_2(ts): def test_gammastd_grp(ts): tts = np.repeat(ts, 5).astype("float32") grps = np.tile(np.arange(5), 10).astype("int16") - xspi = gammastd_grp(tts, grps, np.unique(grps).size, 0, 0, 10) + indices = np.array([[0, 10]] * 10, dtype="int16") + xspi = gammastd_grp(tts, grps, np.unique(grps).size, 0, indices) res = [ -0.38238713, diff --git a/tests/test_utils.py b/tests/test_utils.py index 0714876..0b4acdd 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,9 +1,10 @@ """Test utility functions.""" import numpy as np +import pandas as pd import pytest -from hdc.algo.utils import to_linspace +from hdc.algo.utils import get_calibration_indices, to_linspace @pytest.mark.parametrize( @@ -27,3 +28,47 @@ def test_to_linspace(input_array, expected_output): x, y = to_linspace(input_array) np.testing.assert_equal(x, expected_output[0]) np.testing.assert_equal(y, expected_output[1]) + + +def test_get_calibration_indices(): + tix = pd.date_range("2010-01-01", "2010-12-31") + + assert get_calibration_indices(tix, (tix[0], tix[-1])) == (0, 365) + assert get_calibration_indices(tix, ("2010-01-01", "2010-12-31")) == (0, 365) + assert get_calibration_indices(tix, ("2010-01-15", "2010-12-15")) == (14, 349) + + # groups + res = np.array( + [ + [14, 31], + [0, 28], + [0, 31], + [0, 30], + [0, 31], + [0, 30], + [0, 31], + [0, 31], + [0, 30], + [0, 31], + [0, 30], + [0, 15], + ], + dtype="int16", + ) + + np.testing.assert_array_equal( + get_calibration_indices( + tix, + ("2010-01-15", "2010-12-15"), + groups=tix.month.values - 1, + num_groups=12, + ), + res, + ) + + np.testing.assert_array_equal( + get_calibration_indices( + tix, ("2010-01-15", "2010-12-15"), groups=tix.month.values - 1 + ), + res, + )