From 8b5333c66fcdc1bd26efa99eea605bb1b7ebe5eb Mon Sep 17 00:00:00 2001 From: Simon De Kock Date: Fri, 15 Nov 2024 07:08:38 +0100 Subject: [PATCH 1/3] Refactoring steps-nowcast (#436) --- .pre-commit-config.yaml | 2 +- pysteps/nowcasts/steps.py | 1714 +++++++++++++++++++++++++------------ 2 files changed, 1175 insertions(+), 541 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6ffe446bf..cb599f234 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 24.8.0 + rev: 24.10.0 hooks: - id: black language_version: python3 diff --git a/pysteps/nowcasts/steps.py b/pysteps/nowcasts/steps.py index 50c6a5d22..806c4082b 100644 --- a/pysteps/nowcasts/steps.py +++ b/pysteps/nowcasts/steps.py @@ -25,6 +25,9 @@ from pysteps.timeseries import autoregression, correlation from pysteps.nowcasts.utils import compute_percentile_mask, nowcast_main_loop +from dataclasses import dataclass, field +from typing import Any, Callable + try: import dask @@ -33,63 +36,18 @@ DASK_IMPORTED = False -@deprecate_args({"R": "precip", "V": "velocity", "R_thr": "precip_thr"}, "1.8.0") -def forecast( - precip, - velocity, - timesteps, - n_ens_members=24, - n_cascade_levels=6, - precip_thr=None, - kmperpixel=None, - timestep=None, - extrap_method="semilagrangian", - decomp_method="fft", - bandpass_filter_method="gaussian", - noise_method="nonparametric", - noise_stddev_adj=None, - ar_order=2, - vel_pert_method="bps", - conditional=False, - probmatching_method="cdf", - mask_method="incremental", - seed=None, - num_workers=1, - fft_method="numpy", - domain="spatial", - extrap_kwargs=None, - filter_kwargs=None, - noise_kwargs=None, - vel_pert_kwargs=None, - mask_kwargs=None, - measure_time=False, - callback=None, - return_output=True, -): +@dataclass +class StepsNowcasterConfig: """ - Generate a nowcast ensemble by using the Short-Term Ensemble Prediction - System (STEPS) method. - Parameters ---------- - precip: array-like - Array of shape (ar_order+1,m,n) containing the input precipitation fields - ordered by timestamp from oldest to newest. The time steps between the - inputs are assumed to be regular. - velocity: array-like - Array of shape (2,m,n) containing the x- and y-components of the advection - field. The velocities are assumed to represent one time step between the - inputs. All values are required to be finite. - timesteps: int or list of floats - Number of time steps to forecast or a list of time steps for which the - forecasts are computed (relative to the input time step). The elements - of the list are required to be in ascending order. + n_ens_members: int, optional The number of ensemble members to generate. n_cascade_levels: int, optional The number of cascade levels to use. Defaults to 6, see issue #385 on GitHub. - precip_thr: float, optional + precip_threshold: float, optional Specifies the threshold value for minimum observable precipitation intensity. Required if mask_method is not None or conditional is True. kmperpixel: float, optional @@ -98,10 +56,10 @@ def forecast( timestep: float, optional Time step of the motion vectors (minutes). Required if vel_pert_method is not None or mask_method is 'incremental'. - extrap_method: str, optional + extrapolation_method: str, optional Name of the extrapolation method to use. See the documentation of pysteps.extrapolation.interface. - decomp_method: {'fft'}, optional + decomposition_method: {'fft'}, optional Name of the cascade decomposition method to use. See the documentation of pysteps.cascade.interface. bandpass_filter_method: {'gaussian', 'uniform'}, optional @@ -120,7 +78,7 @@ def forecast( noise std. dev adjustment. ar_order: int, optional The order of the autoregressive model to use. Must be >= 1. - vel_pert_method: {'bps',None}, optional + velocity_perturbation_method: {'bps',None}, optional Name of the noise generator to use for perturbing the advection field. See the documentation of pysteps.noise.interface. If set to None, the advection field is not perturbed. @@ -159,7 +117,7 @@ def forecast( classical STEPS model). If "spectral", the AR(2) models and stochastic perturbations are applied directly in the spectral domain to reduce memory footprint and improve performance :cite:`PCH2019b`. - extrap_kwargs: dict, optional + extrapolation_kwargs: dict, optional Optional dictionary containing keyword arguments for the extrapolation method. See the documentation of pysteps.extrapolation. filter_kwargs: dict, optional @@ -168,7 +126,7 @@ def forecast( noise_kwargs: dict, optional Optional dictionary containing keyword arguments for the initializer of the noise generator. See the documentation of pysteps.noise.fftgenerators. - vel_pert_kwargs: dict, optional + velocity_perturbation_kwargs: dict, optional Optional dictionary containing keyword arguments 'p_par' and 'p_perp' for the initializer of the velocity perturbator. The choice of the optimal parameters depends on the domain and the used optical flow method. @@ -238,482 +196,848 @@ def forecast( Set to False to disable returning the outputs as numpy arrays. This can save memory if the intermediate results are written to output files using the callback function. - - Returns - ------- - out: ndarray - If return_output is True, a four-dimensional array of shape - (n_ens_members,num_timesteps,m,n) containing a time series of forecast - precipitation fields for each ensemble member. Otherwise, a None value - is returned. The time series starts from t0+timestep, where timestep is - taken from the input precipitation fields. If measure_time is True, the - return value is a three-element tuple containing the nowcast array, the - initialization time of the nowcast generator and the time used in the - main loop (seconds). - - See also - -------- - pysteps.extrapolation.interface, pysteps.cascade.interface, - pysteps.noise.interface, pysteps.noise.utils.compute_noise_stddev_adjs - - References - ---------- - :cite:`Seed2003`, :cite:`BPS2006`, :cite:`SPN2013`, :cite:`PCH2019b` """ - _check_inputs(precip, velocity, timesteps, ar_order) - - if extrap_kwargs is None: - extrap_kwargs = dict() - - if filter_kwargs is None: - filter_kwargs = dict() - - if noise_kwargs is None: - noise_kwargs = dict() - - if vel_pert_kwargs is None: - vel_pert_kwargs = dict() + n_ens_members: int = 24 + n_cascade_levels: int = 6 + precip_threshold: float | None = None + kmperpixel: float | None = None + timestep: float | None = None + extrapolation_method: str = "semilagrangian" + decomposition_method: str = "fft" + bandpass_filter_method: str = "gaussian" + noise_method: str | None = "nonparametric" + noise_stddev_adj: str | None = None + ar_order: int = 2 + velocity_perturbation_method: str | None = "bps" + conditional: bool = False + probmatching_method: str | None = "cdf" + mask_method: str | None = "incremental" + seed: int | None = None + num_workers: int = 1 + fft_method: str = "numpy" + domain: str = "spatial" + extrapolation_kwargs: dict[str, Any] = field(default_factory=dict) + filter_kwargs: dict[str, Any] = field(default_factory=dict) + noise_kwargs: dict[str, Any] = field(default_factory=dict) + velocity_perturbation_kwargs: dict[str, Any] = field(default_factory=dict) + mask_kwargs: dict[str, Any] = field(default_factory=dict) + measure_time: bool = False + callback: Callable[[Any], None] | None = None + return_output: bool = True + + +@dataclass +class StepsNowcasterParams: + fft: Any = None + bandpass_filter: Any = None + extrapolation_method: Any = None + decomposition_method: Any = None + recomposition_method: Any = None + noise_generator: Callable | None = None + perturbation_generator: Callable | None = None + noise_std_coefficients: np.ndarray | None = None + ar_model_coefficients: np.ndarray | None = None # Corresponds to phi + autocorrelation_coefficients: np.ndarray | None = None # Corresponds to gamma + domain_mask: np.ndarray | None = None + structuring_element: np.ndarray | None = None + precipitation_mean: float | None = None + wet_area_ratio: float | None = None + mask_rim: int | None = None + num_ensemble_workers: int = 1 + xy_coordinates: np.ndarray | None = None + velocity_perturbation_parallel: list[float] | None = None + velocity_perturbation_perpendicular: list[float] | None = None + + +@dataclass +class StepsNowcasterState: + precip_forecast: list[Any] | None = field(default_factory=list) + precip_cascades: list[list[np.ndarray]] | None = field(default_factory=list) + precip_decomposed: list[dict[str, Any]] | None = field(default_factory=list) + # The observation mask (where the radar can observe the precipitation) + precip_mask: list[Any] | None = field(default_factory=list) + precip_mask_decomposed: dict[str, Any] | None = field(default_factory=dict) + # The mask around the precipitation fields (to get only non-zero values) + mask_precip: np.ndarray | None = None + mask_threshold: np.ndarray | None = None + random_generator_precip: list[np.random.RandomState] | None = field( + default_factory=list + ) + random_generator_motion: list[np.random.RandomState] | None = field( + default_factory=list + ) + velocity_perturbations: list[Callable] | None = field(default_factory=list) + fft_objects: list[Any] | None = field(default_factory=list) - if mask_kwargs is None: - mask_kwargs = dict() - if np.any(~np.isfinite(velocity)): - raise ValueError("velocity contains non-finite values") +class StepsNowcaster: + def __init__( + self, precip, velocity, time_steps, steps_config: StepsNowcasterConfig + ): + # Store inputs and optional parameters + self.__precip = precip + self.__velocity = velocity + self.__time_steps = time_steps + + # Store the config data: + self.__config = steps_config + + # Store the state and params data: + self.__state = StepsNowcasterState() + self.__params = StepsNowcasterParams() + + # Additional variables for time measurement + self.__start_time_init = None + self.__init_time = None + self.__mainloop_time = None + + def compute_forecast(self): + """ + Generate a nowcast ensemble by using the Short-Term Ensemble Prediction + System (STEPS) method. + + Parameters + ---------- + precip: array-like + Array of shape (ar_order+1,m,n) containing the input precipitation fields + ordered by timestamp from oldest to newest. The time steps between the + inputs are assumed to be regular. + velocity: array-like + Array of shape (2,m,n) containing the x- and y-components of the advection + field. The velocities are assumed to represent one time step between the + inputs. All values are required to be finite. + timesteps: int or list of floats + Number of time steps to forecast or a list of time steps for which the + forecasts are computed (relative to the input time step). The elements + of the list are required to be in ascending order. + config: StepsNowcasterConfig + Provides a set of configuration parameters for the nowcast ensemble generation. + + Returns + ------- + out: ndarray + If return_output is True, a four-dimensional array of shape + (n_ens_members,num_timesteps,m,n) containing a time series of forecast + precipitation fields for each ensemble member. Otherwise, a None value + is returned. The time series starts from t0+timestep, where timestep is + taken from the input precipitation fields. If measure_time is True, the + return value is a three-element tuple containing the nowcast array, the + initialization time of the nowcast generator and the time used in the + main loop (seconds). + + See also + -------- + pysteps.extrapolation.interface, pysteps.cascade.interface, + pysteps.noise.interface, pysteps.noise.utils.compute_noise_stddev_adjs + + References + ---------- + :cite:`Seed2003`, :cite:`BPS2006`, :cite:`SPN2013`, :cite:`PCH2019b` + """ + self.__check_inputs() + self.__print_forecast_info() + # Measure time for initialization + if self.__config.measure_time: + self.__start_time_init = time.time() + + self.__initialize_nowcast_components() + # Slice the precipitation field to only use the last ar_order + 1 fields + self.__precip = self.__precip[-(self.__config.ar_order + 1) :, :, :].copy() + + self.__perform_extrapolation() + self.__apply_noise_and_ar_model() + self.__initialize_velocity_perturbations() + self.__initialize_precipitation_mask() + self.__initialize_fft_objects() + # Measure and print initialization time + if self.__config.measure_time: + self.__measure_time("Initialization", self.__start_time_init) + + # Run the main nowcast loop + self.__nowcast_main() + + if self.__config.measure_time: + self.__state.precip_forecast, self.__mainloop_time = ( + self.__state.precip_forecast + ) - if mask_method not in ["obs", "sprog", "incremental", None]: - raise ValueError( - "unknown mask method %s: must be 'obs', 'sprog' or 'incremental' or None" - % mask_method + # Stack and return the forecast output + if self.__config.return_output: + self.__state.precip_forecast = np.stack( + [ + np.stack(self.__state.precip_forecast[j]) + for j in range(self.__config.n_ens_members) + ] + ) + if self.__config.measure_time: + return ( + self.__state.precip_forecast, + self.__init_time, + self.__mainloop_time, + ) + else: + return self.__state.precip_forecast + else: + return None + + def __nowcast_main(self): + """ + Main nowcast loop that iterates through the ensemble members and time steps + to generate forecasts. + """ + # Isolate the last time slice of precipitation + precip = self.__precip[ + -1, :, : + ] # Extract the last available precipitation field + + # Prepare state and params dictionaries, these need to be formatted a specific way for the nowcast_main_loop + state = self.__initialize_state() + params = self.__initialize_params(precip) + + print("Starting nowcast computation.") + + # Run the nowcast main loop + self.__state.precip_forecast = nowcast_main_loop( + precip, + self.__velocity, + state, + self.__time_steps, + self.__config.extrapolation_method, + self.__update_state, # Reference to the update function + extrap_kwargs=self.__config.extrapolation_kwargs, + velocity_pert_gen=self.__state.velocity_perturbations, + params=params, + ensemble=True, + num_ensemble_members=self.__config.n_ens_members, + callback=self.__config.callback, + return_output=self.__config.return_output, + num_workers=self.__params.num_ensemble_workers, + measure_time=self.__config.measure_time, ) - if precip_thr is None: - if conditional: - raise ValueError("conditional = True but precip_thr not specified") - - if mask_method is not None: - raise ValueError("mask_method is not None but precip_thr not specified") - - if probmatching_method == "mean": + def __check_inputs(self): + """ + Validate the inputs to ensure consistency and correct shapes. + """ + if self.__precip.ndim != 3: + raise ValueError("precip must be a three-dimensional array") + if self.__precip.shape[0] < self.__config.ar_order + 1: raise ValueError( - "probmatching_method = 'mean' but precip_thr not specified" + f"precip.shape[0] must be at least ar_order+1, " + f"but found {self.__precip.shape[0]}" ) - - if noise_method is not None and noise_stddev_adj == "auto": - raise ValueError("noise_stddev_adj = 'auto' but precip_thr not specified") - - if noise_stddev_adj not in ["auto", "fixed", None]: - raise ValueError( - "unknown noise_std_dev_adj method %s: must be 'auto', 'fixed', or None" - % noise_stddev_adj - ) - - if kmperpixel is None: - if vel_pert_method is not None: - raise ValueError("vel_pert_method is set but kmperpixel=None") - if mask_method == "incremental": - raise ValueError("mask_method='incremental' but kmperpixel=None") - - if timestep is None: - if vel_pert_method is not None: - raise ValueError("vel_pert_method is set but timestep=None") - if mask_method == "incremental": - raise ValueError("mask_method='incremental' but timestep=None") - - print("Computing STEPS nowcast") - print("-----------------------") - print("") - - print("Inputs") - print("------") - print(f"input dimensions: {precip.shape[1]}x{precip.shape[2]}") - if kmperpixel is not None: - print(f"km/pixel: {kmperpixel}") - if timestep is not None: - print(f"time step: {timestep} minutes") - print("") - - print("Methods") - print("-------") - print(f"extrapolation: {extrap_method}") - print(f"bandpass filter: {bandpass_filter_method}") - print(f"decomposition: {decomp_method}") - print(f"noise generator: {noise_method}") - print("noise adjustment: {}".format(("yes" if noise_stddev_adj else "no"))) - print(f"velocity perturbator: {vel_pert_method}") - print("conditional statistics: {}".format(("yes" if conditional else "no"))) - print(f"precip. mask method: {mask_method}") - print(f"probability matching: {probmatching_method}") - print(f"FFT method: {fft_method}") - print(f"domain: {domain}") - print("") - - print("Parameters") - print("----------") - if isinstance(timesteps, int): - print(f"number of time steps: {timesteps}") - else: - print(f"time steps: {timesteps}") - print(f"ensemble size: {n_ens_members}") - print(f"parallel threads: {num_workers}") - print(f"number of cascade levels: {n_cascade_levels}") - print(f"order of the AR(p) model: {ar_order}") - if vel_pert_method == "bps": - vp_par = vel_pert_kwargs.get("p_par", noise.motion.get_default_params_bps_par()) - vp_perp = vel_pert_kwargs.get( - "p_perp", noise.motion.get_default_params_bps_perp() - ) + if self.__velocity.ndim != 3: + raise ValueError("velocity must be a three-dimensional array") + if self.__precip.shape[1:3] != self.__velocity.shape[1:3]: + raise ValueError( + f"Dimension mismatch between precip and velocity: " + f"shape(precip)={self.__precip.shape}, shape(velocity)={self.__velocity.shape}" + ) + if ( + isinstance(self.__time_steps, list) + and not sorted(self.__time_steps) == self.__time_steps + ): + raise ValueError("timesteps must be in ascending order") + if np.any(~np.isfinite(self.__velocity)): + raise ValueError("velocity contains non-finite values") + if self.__config.mask_method not in ["obs", "sprog", "incremental", None]: + raise ValueError( + f"Unknown mask method '{self.__config.mask_method}'. " + "Must be 'obs', 'sprog', 'incremental', or None." + ) + if self.__config.precip_threshold is None: + if self.__config.conditional: + raise ValueError("conditional=True but precip_thr is not specified.") + if self.__config.mask_method is not None: + raise ValueError("mask_method is set but precip_thr is not specified.") + if self.__config.probmatching_method == "mean": + raise ValueError( + "probmatching_method='mean' but precip_thr is not specified." + ) + if ( + self.__config.noise_method is not None + and self.__config.noise_stddev_adj == "auto" + ): + raise ValueError( + "noise_stddev_adj='auto' but precip_thr is not specified." + ) + if self.__config.noise_stddev_adj not in ["auto", "fixed", None]: + raise ValueError( + f"Unknown noise_stddev_adj method '{self.__config.noise_stddev_adj}'. " + "Must be 'auto', 'fixed', or None." + ) + if self.__config.kmperpixel is None: + if self.__config.velocity_perturbation_method is not None: + raise ValueError("vel_pert_method is set but kmperpixel=None") + if self.__config.mask_method == "incremental": + raise ValueError("mask_method='incremental' but kmperpixel=None") + if self.__config.timestep is None: + if self.__config.velocity_perturbation_method is not None: + raise ValueError("vel_pert_method is set but timestep=None") + if self.__config.mask_method == "incremental": + raise ValueError("mask_method='incremental' but timestep=None") + + # Handle None values for various kwargs + if self.__config.extrapolation_kwargs is None: + self.__config.extrapolation_kwargs = {} + if self.__config.filter_kwargs is None: + self.__config.filter_kwargs = {} + if self.__config.noise_kwargs is None: + self.__config.noise_kwargs = {} + if self.__config.velocity_perturbation_kwargs is None: + self.__config.velocity_perturbation_kwargs = {} + if self.__config.mask_kwargs is None: + self.__config.mask_kwargs = {} + + print("Inputs validated and initialized successfully.") + + def __print_forecast_info(self): + """ + Print information about the forecast setup, including inputs, methods, and parameters. + """ + print("Computing STEPS nowcast") + print("-----------------------") + print("") + + print("Inputs") + print("------") + print(f"input dimensions: {self.__precip.shape[1]}x{self.__precip.shape[2]}") + if self.__config.kmperpixel is not None: + print(f"km/pixel: {self.__config.kmperpixel}") + if self.__config.timestep is not None: + print(f"time step: {self.__config.timestep} minutes") + print("") + + print("Methods") + print("-------") + print(f"extrapolation: {self.__config.extrapolation_method}") + print(f"bandpass filter: {self.__config.bandpass_filter_method}") + print(f"decomposition: {self.__config.decomposition_method}") + print(f"noise generator: {self.__config.noise_method}") print( - f"velocity perturbations, parallel: {vp_par[0]},{vp_par[1]},{vp_par[2]}" + "noise adjustment: {}".format( + ("yes" if self.__config.noise_stddev_adj else "no") + ) ) + print(f"velocity perturbator: {self.__config.velocity_perturbation_method}") print( - f"velocity perturbations, perpendicular: {vp_perp[0]},{vp_perp[1]},{vp_perp[2]}" + "conditional statistics: {}".format( + ("yes" if self.__config.conditional else "no") + ) + ) + print(f"precip. mask method: {self.__config.mask_method}") + print(f"probability matching: {self.__config.probmatching_method}") + print(f"FFT method: {self.__config.fft_method}") + print(f"domain: {self.__config.domain}") + print("") + + print("Parameters") + print("----------") + if isinstance(self.__time_steps, int): + print(f"number of time steps: {self.__time_steps}") + else: + print(f"time steps: {self.__time_steps}") + print(f"ensemble size: {self.__config.n_ens_members}") + print(f"parallel threads: {self.__config.num_workers}") + print(f"number of cascade levels: {self.__config.n_cascade_levels}") + print(f"order of the AR(p) model: {self.__config.ar_order}") + + if self.__config.velocity_perturbation_method == "bps": + self.__params.velocity_perturbation_parallel = ( + self.__config.velocity_perturbation_kwargs.get( + "p_par", noise.motion.get_default_params_bps_par() + ) + ) + self.__params.velocity_perturbation_perpendicular = ( + self.__config.velocity_perturbation_kwargs.get( + "p_perp", noise.motion.get_default_params_bps_perp() + ) + ) + print( + f"velocity perturbations, parallel: {self.__params.velocity_perturbation_parallel[0]},{self.__params.velocity_perturbation_parallel[1]},{self.__params.velocity_perturbation_parallel[2]}" + ) + print( + f"velocity perturbations, perpendicular: {self.__params.velocity_perturbation_perpendicular[0]},{self.__params.velocity_perturbation_perpendicular[1]},{self.__params.velocity_perturbation_perpendicular[2]}" + ) + + if self.__config.precip_threshold is not None: + print(f"precip. intensity threshold: {self.__config.precip_threshold}") + + def __initialize_nowcast_components(self): + """ + Initialize the FFT, bandpass filters, decomposition methods, and extrapolation method. + """ + # Initialize number of ensemble workers + self.__params.num_ensemble_workers = min( + self.__config.n_ens_members, self.__config.num_workers ) - if precip_thr is not None: - print(f"precip. intensity threshold: {precip_thr}") + M, N = self.__precip.shape[1:] # Extract the spatial dimensions (height, width) - num_ensemble_workers = min(n_ens_members, num_workers) + # Initialize FFT method + self.__params.fft = utils.get_method( + self.__config.fft_method, shape=(M, N), n_threads=self.__config.num_workers + ) - if measure_time: - starttime_init = time.time() + # Initialize the band-pass filter for the cascade decomposition + filter_method = cascade.get_method(self.__config.bandpass_filter_method) + self.__params.bandpass_filter = filter_method( + (M, N), + self.__config.n_cascade_levels, + **(self.__config.filter_kwargs or {}), + ) - fft = utils.get_method(fft_method, shape=precip.shape[1:], n_threads=num_workers) + # Get the decomposition method (e.g., FFT) + self.__params.decomposition_method, self.__params.recomposition_method = ( + cascade.get_method(self.__config.decomposition_method) + ) - M, N = precip.shape[1:] + # Get the extrapolation method (e.g., semilagrangian) + self.__params.extrapolation_method = extrapolation.get_method( + self.__config.extrapolation_method + ) - # initialize the band-pass filter - filter_method = cascade.get_method(bandpass_filter_method) - bp_filter = filter_method((M, N), n_cascade_levels, **filter_kwargs) + # Generate the mesh grid for spatial coordinates + x_values, y_values = np.meshgrid(np.arange(N), np.arange(M)) + self.__params.xy_coordinates = np.stack([x_values, y_values]) - decomp_method, recomp_method = cascade.get_method(decomp_method) + # Determine the domain mask from non-finite values in the precipitation data + self.__params.domain_mask = np.logical_or.reduce( + [~np.isfinite(self.__precip[i, :]) for i in range(self.__precip.shape[0])] + ) - extrapolator_method = extrapolation.get_method(extrap_method) + print("Nowcast components initialized successfully.") + + def __perform_extrapolation(self): + """ + Extrapolate (advect) precipitation fields based on the velocity field to align + them in time. This prepares the precipitation fields for autoregressive modeling. + """ + # Determine the precipitation threshold mask if conditional is set + if self.__config.conditional: + self.__state.mask_threshold = np.logical_and.reduce( + [ + self.__precip[i, :, :] >= self.__config.precip_threshold + for i in range(self.__precip.shape[0]) + ] + ) + else: + self.__state.mask_threshold = None - x_values, y_values = np.meshgrid( - np.arange(precip.shape[2]), np.arange(precip.shape[1]) - ) + extrap_kwargs = self.__config.extrapolation_kwargs.copy() + extrap_kwargs["xy_coords"] = self.__params.xy_coordinates + extrap_kwargs["allow_nonfinite_values"] = ( + True if np.any(~np.isfinite(self.__precip)) else False + ) - xy_coords = np.stack([x_values, y_values]) + res = [] - precip = precip[-(ar_order + 1) :, :, :].copy() + def __extrapolate_single_field(precip, i): + # Extrapolate a single precipitation field using the velocity field + return self.__params.extrapolation_method( + precip[i, :, :], + self.__velocity, + self.__config.ar_order - i, + "min", + **extrap_kwargs, + )[-1] + + for i in range(self.__config.ar_order): + if ( + not DASK_IMPORTED + ): # If Dask is not available, perform sequential extrapolation + self.__precip[i, :, :] = __extrapolate_single_field(self.__precip, i) + else: + # If Dask is available, accumulate delayed computations for parallel execution + res.append(dask.delayed(__extrapolate_single_field)(self.__precip, i)) + + # If Dask is available, perform the parallel computation + if DASK_IMPORTED and res: + num_workers_ = min(self.__params.num_ensemble_workers, len(res)) + self.__precip = np.stack( + list(dask.compute(*res, num_workers=num_workers_)) + + [self.__precip[-1, :, :]] + ) - # determine the domain mask from non-finite values - domain_mask = np.logical_or.reduce( - [~np.isfinite(precip[i, :]) for i in range(precip.shape[0])] - ) + print("Extrapolation complete and precipitation fields aligned.") + + def __apply_noise_and_ar_model(self): + """ + Apply noise and autoregressive (AR) models to precipitation cascades. + This method applies the AR model to the decomposed precipitation cascades + and adds noise perturbations if necessary. + """ + # Make a copy of the precipitation data and replace non-finite values + precip = self.__precip.copy() + for i in range(self.__precip.shape[0]): + # Replace non-finite values with the minimum finite value of the precipitation field + precip[i, ~np.isfinite(precip[i, :])] = np.nanmin(precip[i, :]) + # Store the precipitation data back in the object + self.__precip = precip + + # Initialize the noise generator if the noise_method is provided + if self.__config.noise_method is not None: + np.random.seed( + self.__config.seed + ) # Set the random seed for reproducibility + init_noise, generate_noise = noise.get_method(self.__config.noise_method) + self.__params.noise_generator = generate_noise + + self.__params.perturbation_generator = init_noise( + self.__precip, + fft_method=self.__params.fft, + **self.__config.noise_kwargs, + ) - # determine the precipitation threshold mask - if conditional: - mask_thr = np.logical_and.reduce( - [precip[i, :, :] >= precip_thr for i in range(precip.shape[0])] - ) - else: - mask_thr = None - - # advect the previous precipitation fields to the same position with the - # most recent one (i.e. transform them into the Lagrangian coordinates) - extrap_kwargs = extrap_kwargs.copy() - extrap_kwargs["xy_coords"] = xy_coords - extrap_kwargs["allow_nonfinite_values"] = ( - True if np.any(~np.isfinite(precip)) else False - ) + # Handle noise standard deviation adjustments if necessary + if self.__config.noise_stddev_adj == "auto": + print("Computing noise adjustment coefficients... ", end="", flush=True) + if self.__config.measure_time: + starttime = time.time() + + # Compute noise adjustment coefficients + self.__params.noise_std_coefficients = ( + noise.utils.compute_noise_stddev_adjs( + self.__precip[-1, :, :], + self.__config.precip_threshold, + np.min(self.__precip), + self.__params.bandpass_filter, + self.__params.decomposition_method, + self.__params.perturbation_generator, + self.__params.noise_generator, + 20, + conditional=self.__config.conditional, + num_workers=self.__config.num_workers, + seed=self.__config.seed, + ) + ) - res = list() + # Measure and print time taken + if self.__config.measure_time: + self.__measure_time( + "Noise adjustment coefficient computation", starttime + ) + else: + print("done.") + + elif self.__config.noise_stddev_adj == "fixed": + # Set fixed noise adjustment coefficients + func = lambda k: 1.0 / (0.75 + 0.09 * k) + self.__params.noise_std_coefficients = [ + func(k) for k in range(1, self.__config.n_cascade_levels + 1) + ] - def f(precip, i): - return extrapolator_method( - precip[i, :, :], velocity, ar_order - i, "min", **extrap_kwargs - )[-1] + else: + # Default to no adjustment + self.__params.noise_std_coefficients = np.ones( + self.__config.n_cascade_levels + ) + + if self.__config.noise_stddev_adj is not None: + # Print noise std deviation coefficients if adjustments were made + print( + f"noise std. dev. coeffs: {str(self.__params.noise_std_coefficients)}" + ) - for i in range(ar_order): - if not DASK_IMPORTED: - precip[i, :, :] = f(precip, i) else: - res.append(dask.delayed(f)(precip, i)) + # No noise, so set perturbation generator and noise_std_coefficients to None + self.__params.perturbation_generator = None + self.__params.noise_std_coefficients = np.ones( + self.__config.n_cascade_levels + ) # Keep default as 1.0 to avoid breaking AR model + + # Decompose the input precipitation fields + self.__state.precip_decomposed = [] + for i in range(self.__config.ar_order + 1): + precip_ = self.__params.decomposition_method( + self.__precip[i, :, :], + self.__params.bandpass_filter, + mask=self.__state.mask_threshold, + fft_method=self.__params.fft, + output_domain=self.__config.domain, + normalize=True, + compute_stats=True, + compact_output=True, + ) + self.__state.precip_decomposed.append(precip_) - if DASK_IMPORTED: - num_workers_ = len(res) if num_workers > len(res) else num_workers - precip = np.stack( - list(dask.compute(*res, num_workers=num_workers_)) + [precip[-1, :, :]] + # Normalize the cascades and rearrange them into a 4D array + self.__state.precip_cascades = nowcast_utils.stack_cascades( + self.__state.precip_decomposed, self.__config.n_cascade_levels ) + self.__state.precip_decomposed = self.__state.precip_decomposed[-1] + self.__state.precip_decomposed = [ + self.__state.precip_decomposed.copy() + for _ in range(self.__config.n_ens_members) + ] - # replace non-finite values with the minimum value - precip = precip.copy() - for i in range(precip.shape[0]): - precip[i, ~np.isfinite(precip[i, :])] = np.nanmin(precip[i, :]) - - if noise_method is not None: - np.random.seed(seed) - # get methods for perturbations - init_noise, generate_noise = noise.get_method(noise_method) - - # initialize the perturbation generator for the precipitation field - pert_gen = init_noise(precip, fft_method=fft, **noise_kwargs) - - if noise_stddev_adj == "auto": - print("Computing noise adjustment coefficients... ", end="", flush=True) - if measure_time: - starttime = time.time() - - precip_min = np.min(precip) - noise_std_coeffs = noise.utils.compute_noise_stddev_adjs( - precip[-1, :, :], - precip_thr, - precip_min, - bp_filter, - decomp_method, - pert_gen, - generate_noise, - 20, - conditional=True, - num_workers=num_workers, - seed=seed, + # Compute temporal autocorrelation coefficients for each cascade level + self.__params.autocorrelation_coefficients = np.empty( + (self.__config.n_cascade_levels, self.__config.ar_order) + ) + for i in range(self.__config.n_cascade_levels): + self.__params.autocorrelation_coefficients[i, :] = ( + correlation.temporal_autocorrelation( + self.__state.precip_cascades[i], mask=self.__state.mask_threshold + ) ) - if measure_time: - print(f"{time.time() - starttime:.2f} seconds.") - else: - print("done.") - elif noise_stddev_adj == "fixed": - func = lambda k: 1.0 / (0.75 + 0.09 * k) - noise_std_coeffs = [func(k) for k in range(1, n_cascade_levels + 1)] - else: - noise_std_coeffs = np.ones(n_cascade_levels) - - if noise_stddev_adj is not None: - print(f"noise std. dev. coeffs: {str(noise_std_coeffs)}") - else: - pert_gen = None - noise_std_coeffs = None - - # compute the cascade decompositions of the input precipitation fields - precip_decomp = [] - for i in range(ar_order + 1): - precip_ = decomp_method( - precip[i, :, :], - bp_filter, - mask=mask_thr, - fft_method=fft, - output_domain=domain, - normalize=True, - compute_stats=True, - compact_output=True, + nowcast_utils.print_corrcoefs(self.__params.autocorrelation_coefficients) + + # Adjust the lag-2 correlation coefficient if AR(2) model is used + if self.__config.ar_order == 2: + for i in range(self.__config.n_cascade_levels): + self.__params.autocorrelation_coefficients[i, 1] = ( + autoregression.adjust_lag2_corrcoef2( + self.__params.autocorrelation_coefficients[i, 0], + self.__params.autocorrelation_coefficients[i, 1], + ) + ) + + # Estimate the parameters of the AR model using auto-correlation coefficients + self.__params.ar_model_coefficients = np.empty( + (self.__config.n_cascade_levels, self.__config.ar_order + 1) ) - precip_decomp.append(precip_) + for i in range(self.__config.n_cascade_levels): + self.__params.ar_model_coefficients[i, :] = ( + autoregression.estimate_ar_params_yw( + self.__params.autocorrelation_coefficients[i, :] + ) + ) - # normalize the cascades and rearrange them into a four-dimensional array - # of shape (n_cascade_levels,ar_order+1,m,n) for the autoregressive model - precip_cascades = nowcast_utils.stack_cascades(precip_decomp, n_cascade_levels) + nowcast_utils.print_ar_params(self.__params.ar_model_coefficients) - precip_decomp = precip_decomp[-1] - precip_decomp = [precip_decomp.copy() for _ in range(n_ens_members)] + # Discard all except the last ar_order cascades for AR model + self.__state.precip_cascades = [ + self.__state.precip_cascades[i][-self.__config.ar_order :] + for i in range(self.__config.n_cascade_levels) + ] - # compute lag-l temporal autocorrelation coefficients for each cascade level - gamma = np.empty((n_cascade_levels, ar_order)) - for i in range(n_cascade_levels): - gamma[i, :] = correlation.temporal_autocorrelation( - precip_cascades[i], mask=mask_thr - ) + # Stack the cascades into a list containing all ensemble members + self.__state.precip_cascades = [ + [ + self.__state.precip_cascades[j].copy() + for j in range(self.__config.n_cascade_levels) + ] + for _ in range(self.__config.n_ens_members) + ] - nowcast_utils.print_corrcoefs(gamma) - - if ar_order == 2: - # adjust the lag-2 correlation coefficient to ensure that the AR(p) - # process is stationary - for i in range(n_cascade_levels): - gamma[i, 1] = autoregression.adjust_lag2_corrcoef2(gamma[i, 0], gamma[i, 1]) - - # estimate the parameters of the AR(p) model from the autocorrelation - # coefficients - phi = np.empty((n_cascade_levels, ar_order + 1)) - for i in range(n_cascade_levels): - phi[i, :] = autoregression.estimate_ar_params_yw(gamma[i, :]) - - nowcast_utils.print_ar_params(phi) - - # discard all except the p-1 last cascades because they are not needed for - # the AR(p) model - precip_cascades = [precip_cascades[i][-ar_order:] for i in range(n_cascade_levels)] - - # stack the cascades into a list containing all ensemble members - precip_cascades = [ - [precip_cascades[j].copy() for j in range(n_cascade_levels)] - for _ in range(n_ens_members) - ] - - # initialize the random generators - if noise_method is not None: - randgen_prec = [] - randgen_motion = [] - for _ in range(n_ens_members): - rs = np.random.RandomState(seed) - randgen_prec.append(rs) - seed = rs.randint(0, high=1e9) - rs = np.random.RandomState(seed) - randgen_motion.append(rs) - seed = rs.randint(0, high=1e9) - else: - randgen_prec = None - - if vel_pert_method is not None: - init_vel_noise, generate_vel_noise = noise.get_method(vel_pert_method) - - # initialize the perturbation generators for the motion field - velocity_perturbators = [] - for j in range(n_ens_members): - kwargs = { - "randstate": randgen_motion[j], - "p_par": vp_par, - "p_perp": vp_perp, - } - vp = init_vel_noise(velocity, 1.0 / kmperpixel, timestep, **kwargs) - velocity_perturbators.append( - lambda t, vp=vp: generate_vel_noise(vp, t * timestep) + # Initialize random generators if noise_method is provided + if self.__config.noise_method is not None: + self.__state.random_generator_precip = [] + self.__state.random_generator_motion = [] + + for _ in range(self.__config.n_ens_members): + # Create random state for precipitation noise generator + rs = np.random.RandomState(self.__config.seed) + self.__state.random_generator_precip.append(rs) + self.__config.seed = rs.randint( + 0, high=int(1e9) + ) # Update seed after generating + + # Create random state for motion perturbations generator + rs = np.random.RandomState(self.__config.seed) + self.__state.random_generator_motion.append(rs) + self.__config.seed = rs.randint( + 0, high=int(1e9) + ) # Update seed after generating + else: + self.__state.random_generator_precip = None + self.__state.random_generator_motion = None + print("AR model and noise applied to precipitation cascades.") + + def __initialize_velocity_perturbations(self): + """ + Initialize the velocity perturbators for each ensemble member if the velocity + perturbation method is specified. + """ + if self.__config.velocity_perturbation_method is not None: + init_vel_noise, generate_vel_noise = noise.get_method( + self.__config.velocity_perturbation_method ) - else: - velocity_perturbators = None - - precip_forecast = [[] for _ in range(n_ens_members)] - - if probmatching_method == "mean": - mu_0 = np.mean(precip[-1, :, :][precip[-1, :, :] >= precip_thr]) - else: - mu_0 = None - - precip_m = None - precip_m_d = None - war = None - struct = None - mask_rim = None - - if mask_method is not None: - mask_prec = precip[-1, :, :] >= precip_thr - - if mask_method == "sprog": - # compute the wet area ratio and the precipitation mask - war = 1.0 * np.sum(mask_prec) / (precip.shape[1] * precip.shape[2]) - precip_m = [precip_cascades[0][i].copy() for i in range(n_cascade_levels)] - precip_m_d = precip_decomp[0].copy() - elif mask_method == "incremental": - # get mask parameters - mask_rim = mask_kwargs.get("mask_rim", 10) - mask_f = mask_kwargs.get("mask_f", 1.0) - # initialize the structuring element - struct = generate_binary_structure(2, 1) - # iterate it to expand it nxn - n = mask_f * timestep / kmperpixel - struct = iterate_structure(struct, int((n - 1) / 2.0)) - # initialize precip mask for each member - mask_prec = nowcast_utils.compute_dilated_mask(mask_prec, struct, mask_rim) - mask_prec = [mask_prec.copy() for _ in range(n_ens_members)] - else: - mask_prec = None - - if noise_method is None and precip_m is None: - precip_m = [precip_cascades[0][i].copy() for i in range(n_cascade_levels)] - - fft_objs = [] - for _ in range(n_ens_members): - fft_objs.append(utils.get_method(fft_method, shape=precip.shape[1:])) - - if measure_time: - init_time = time.time() - starttime_init - - precip = precip[-1, :, :] - - print("Starting nowcast computation.") - - # the nowcast iteration for each ensemble member - state = { - "fft_objs": fft_objs, - "mask_prec": mask_prec, - "precip_cascades": precip_cascades, - "precip_decomp": precip_decomp, - "precip_m": precip_m, - "precip_m_d": precip_m_d, - "randgen_prec": randgen_prec, - } - params = { - "decomp_method": decomp_method, - "domain": domain, - "domain_mask": domain_mask, - "filter": bp_filter, - "fft": fft, - "generate_noise": generate_noise, - "mask_method": mask_method, - "mask_rim": mask_rim, - "mu_0": mu_0, - "n_cascade_levels": n_cascade_levels, - "n_ens_members": n_ens_members, - "noise_method": noise_method, - "noise_std_coeffs": noise_std_coeffs, - "num_ensemble_workers": num_ensemble_workers, - "phi": phi, - "pert_gen": pert_gen, - "probmatching_method": probmatching_method, - "precip": precip, - "precip_thr": precip_thr, - "recomp_method": recomp_method, - "struct": struct, - "war": war, - } - - precip_forecast = nowcast_main_loop( - precip, - velocity, - state, - timesteps, - extrap_method, - _update, - extrap_kwargs=extrap_kwargs, - velocity_pert_gen=velocity_perturbators, - params=params, - ensemble=True, - num_ensemble_members=n_ens_members, - callback=callback, - return_output=return_output, - num_workers=num_ensemble_workers, - measure_time=measure_time, - ) - if measure_time: - precip_forecast, mainloop_time = precip_forecast - if return_output: - precip_forecast = np.stack( - [np.stack(precip_forecast[j]) for j in range(n_ens_members)] - ) - if measure_time: - return precip_forecast, init_time, mainloop_time + self.__state.velocity_perturbations = [] + for j in range(self.__config.n_ens_members): + kwargs = { + "randstate": self.__state.random_generator_motion[j], + "p_par": self.__config.velocity_perturbation_kwargs.get( + "p_par", self.__params.velocity_perturbation_parallel + ), + "p_perp": self.__config.velocity_perturbation_kwargs.get( + "p_perp", self.__params.velocity_perturbation_perpendicular + ), + } + vp = init_vel_noise( + self.__velocity, + 1.0 / self.__config.kmperpixel, + self.__config.timestep, + **kwargs, + ) + self.__state.velocity_perturbations.append( + lambda t, vp=vp: generate_vel_noise(vp, t * self.__config.timestep) + ) else: - return precip_forecast - else: - return None - - -def _check_inputs(precip, velocity, timesteps, ar_order): - if precip.ndim != 3: - raise ValueError("precip must be a three-dimensional array") - if precip.shape[0] < ar_order + 1: - raise ValueError("precip.shape[0] < ar_order+1") - if velocity.ndim != 3: - raise ValueError("velocity must be a three-dimensional array") - if precip.shape[1:3] != velocity.shape[1:3]: - raise ValueError( - "dimension mismatch between precip and velocity: shape(precip)=%s, shape(velocity)=%s" - % (str(precip.shape), str(velocity.shape)) - ) - if isinstance(timesteps, list) and not sorted(timesteps) == timesteps: - raise ValueError("timesteps is not in ascending order") + self.__state.velocity_perturbations = None + print("Velocity perturbations initialized successfully.") + + def __initialize_precipitation_mask(self): + """ + Initialize the precipitation mask and handle different mask methods (sprog, incremental). + """ + self.__state.precip_forecast = [[] for _ in range(self.__config.n_ens_members)] + + if self.__config.probmatching_method == "mean": + self.__params.precipitation_mean = np.mean( + self.__precip[-1, :, :][ + self.__precip[-1, :, :] >= self.__config.precip_threshold + ] + ) + else: + self.__params.precipitation_mean = None + if self.__config.mask_method is not None: + self.__state.mask_precip = ( + self.__precip[-1, :, :] >= self.__config.precip_threshold + ) + + if self.__config.mask_method == "sprog": + # Compute the wet area ratio and the precipitation mask + self.__params.wet_area_ratio = np.sum(self.__state.mask_precip) / ( + self.__precip.shape[1] * self.__precip.shape[2] + ) + self.__state.precip_mask = [ + self.__state.precip_cascades[0][i].copy() + for i in range(self.__config.n_cascade_levels) + ] + self.__state.precip_mask_decomposed = self.__state.precip_decomposed[ + 0 + ].copy() + + elif self.__config.mask_method == "incremental": + # Get mask parameters + self.__params.mask_rim = self.__config.mask_kwargs.get("mask_rim", 10) + mask_f = self.__config.mask_kwargs.get("mask_f", 1.0) + # Initialize the structuring element + self.__params.structuring_element = generate_binary_structure(2, 1) + # Expand the structuring element based on mask factor and timestep + n = mask_f * self.__config.timestep / self.__config.kmperpixel + self.__params.structuring_element = iterate_structure( + self.__params.structuring_element, int((n - 1) / 2.0) + ) + # Compute and apply the dilated mask for each ensemble member + self.__state.mask_precip = nowcast_utils.compute_dilated_mask( + self.__state.mask_precip, + self.__params.structuring_element, + self.__params.mask_rim, + ) + self.__state.mask_precip = [ + self.__state.mask_precip.copy() + for _ in range(self.__config.n_ens_members) + ] + else: + self.__state.mask_precip = None + + if self.__config.noise_method is None and self.__state.precip_mask is None: + self.__state.precip_mask = [ + self.__state.precip_cascades[0][i].copy() + for i in range(self.__config.n_cascade_levels) + ] + print("Precipitation mask initialized successfully.") + + def __initialize_fft_objects(self): + """ + Initialize FFT objects for each ensemble member. + """ + self.__state.fft_objs = [] + for _ in range(self.__config.n_ens_members): + fft_obj = utils.get_method( + self.__config.fft_method, shape=self.__precip.shape[1:] + ) + self.__state.fft_objs.append(fft_obj) + print("FFT objects initialized successfully.") + + def __initialize_state(self): + """ + Initialize the state dictionary used during the nowcast iteration. + """ + return { + "fft_objs": self.__state.fft_objs, + "mask_prec": self.__state.mask_precip, + "precip_cascades": self.__state.precip_cascades, + "precip_decomp": self.__state.precip_decomposed, + "precip_m": self.__state.precip_mask, + "precip_m_d": self.__state.precip_mask_decomposed, + "randgen_prec": self.__state.random_generator_precip, + } + + def __initialize_params(self, precip): + """ + Initialize the params dictionary used during the nowcast iteration. + """ + return { + "decomp_method": self.__params.decomposition_method, + "domain": self.__config.domain, + "domain_mask": self.__params.domain_mask, + "filter": self.__params.bandpass_filter, + "fft": self.__params.fft, + "generate_noise": self.__params.noise_generator, + "mask_method": self.__config.mask_method, + "mask_rim": self.__params.mask_rim, + "mu_0": self.__params.precipitation_mean, + "n_cascade_levels": self.__config.n_cascade_levels, + "n_ens_members": self.__config.n_ens_members, + "noise_method": self.__config.noise_method, + "noise_std_coeffs": self.__params.noise_std_coefficients, + "num_ensemble_workers": self.__params.num_ensemble_workers, + "phi": self.__params.ar_model_coefficients, + "pert_gen": self.__params.perturbation_generator, + "probmatching_method": self.__config.probmatching_method, + "precip": precip, + "precip_thr": self.__config.precip_threshold, + "recomp_method": self.__params.recomposition_method, + "struct": self.__params.structuring_element, + "war": self.__params.wet_area_ratio, + } + + def __update_state(self, state, params): + """ + Update the state during the nowcasting loop. This function handles the AR model iteration, + noise generation, recomposition, and mask application for each ensemble member. + """ + precip_forecast_out = [None] * params["n_ens_members"] + + # Update the deterministic AR(p) model if noise or sprog mask is used + if params["noise_method"] is None or params["mask_method"] == "sprog": + self.__update_deterministic_ar_model(state, params) + + # Worker function for each ensemble member + def worker(j): + self.__apply_ar_model_to_cascades(j, state, params) + precip_forecast_out[j] = self.__recompose_and_apply_mask(j, state, params) + + # Use Dask for parallel execution if available + if ( + DASK_IMPORTED + and params["n_ens_members"] > 1 + and params["num_ensemble_workers"] > 1 + ): + res = [] + for j in range(params["n_ens_members"]): + res.append(dask.delayed(worker)(j)) + dask.compute(*res, num_workers=params["num_ensemble_workers"]) + else: + for j in range(params["n_ens_members"]): + worker(j) -def _update(state, params): - precip_forecast_out = [None] * params["n_ens_members"] + return np.stack(precip_forecast_out), state - if params["noise_method"] is None or params["mask_method"] == "sprog": + def __update_deterministic_ar_model(self, state, params): + """ + Update the deterministic AR(p) model for each cascade level if noise is disabled + or if the sprog mask is used. + """ for i in range(params["n_cascade_levels"]): - # use a separate AR(p) model for the non-perturbed forecast, - # from which the mask is obtained state["precip_m"][i] = autoregression.iterate_ar_model( state["precip_m"][i], params["phi"][i, :] ) @@ -721,50 +1045,40 @@ def _update(state, params): state["precip_m_d"]["cascade_levels"] = [ state["precip_m"][i][-1] for i in range(params["n_cascade_levels"]) ] + if params["domain"] == "spatial": state["precip_m_d"]["cascade_levels"] = np.stack( state["precip_m_d"]["cascade_levels"] ) + precip_m_ = params["recomp_method"](state["precip_m_d"]) + if params["domain"] == "spectral": precip_m_ = params["fft"].irfft2(precip_m_) if params["mask_method"] == "sprog": state["mask_prec"] = compute_percentile_mask(precip_m_, params["war"]) - def worker(j): + def __apply_ar_model_to_cascades(self, j, state, params): + """ + Apply the AR(p) model to the cascades for each ensemble member, including + noise generation and normalization. + """ + # Generate noise if enabled if params["noise_method"] is not None: - # generate noise field - eps = params["generate_noise"]( - params["pert_gen"], - randstate=state["randgen_prec"][j], - fft_method=state["fft_objs"][j], - domain=params["domain"], - ) - - # decompose the noise field into a cascade - eps = params["decomp_method"]( - eps, - params["filter"], - fft_method=state["fft_objs"][j], - input_domain=params["domain"], - output_domain=params["domain"], - compute_stats=True, - normalize=True, - compact_output=True, - ) + eps = self.__generate_and_decompose_noise(j, state, params) else: eps = None - # iterate the AR(p) model for each cascade level + # Iterate the AR(p) model for each cascade level for i in range(params["n_cascade_levels"]): - # normalize the noise cascade if eps is not None: eps_ = eps["cascade_levels"][i] eps_ *= params["noise_std_coeffs"][i] else: eps_ = None - # apply AR(p) process to cascade level + + # Apply the AR(p) model with or without perturbations if eps is not None or params["vel_pert_method"] is not None: state["precip_cascades"][j][i] = autoregression.iterate_ar_model( state["precip_cascades"][j][i], params["phi"][i, :], eps=eps_ @@ -777,12 +1091,39 @@ def worker(j): eps = None eps_ = None - # compute the recomposed precipitation field(s) from the cascades - # obtained from the AR(p) model(s) + def __generate_and_decompose_noise(self, j, state, params): + """ + Generate and decompose the noise field into cascades for a given ensemble member. + """ + eps = params["generate_noise"]( + params["pert_gen"], + randstate=state["randgen_prec"][j], + fft_method=state["fft_objs"][j], + domain=params["domain"], + ) + + eps = params["decomp_method"]( + eps, + params["filter"], + fft_method=state["fft_objs"][j], + input_domain=params["domain"], + output_domain=params["domain"], + compute_stats=True, + normalize=True, + compact_output=True, + ) + + return eps + + def __recompose_and_apply_mask(self, j, state, params): + """ + Recompose the precipitation field from cascades and apply the precipitation mask. + """ state["precip_decomp"][j]["cascade_levels"] = [ state["precip_cascades"][j][i][-1, :] for i in range(params["n_cascade_levels"]) ] + if params["domain"] == "spatial": state["precip_decomp"][j]["cascade_levels"] = np.stack( state["precip_decomp"][j]["cascade_levels"] @@ -793,34 +1134,24 @@ def worker(j): if params["domain"] == "spectral": precip_forecast = state["fft_objs"][j].irfft2(precip_forecast) + # Apply the precipitation mask if params["mask_method"] is not None: - # apply the precipitation mask to prevent generation of new - # precipitation into areas where it was not originally - # observed - precip_forecast_min = precip_forecast.min() - if params["mask_method"] == "incremental": - precip_forecast = ( - precip_forecast_min - + (precip_forecast - precip_forecast_min) * state["mask_prec"][j] - ) - mask_prec_ = precip_forecast > precip_forecast_min - else: - mask_prec_ = state["mask_prec"] - - # set to min value outside mask - precip_forecast[~mask_prec_] = precip_forecast_min + precip_forecast = self.__apply_precipitation_mask( + precip_forecast, j, state, params + ) + # Adjust the CDF of the forecast to match the observed precipitation field if params["probmatching_method"] == "cdf": - # adjust the CDF of the forecast to match the most recently - # observed precipitation field precip_forecast = probmatching.nonparam_match_empirical_cdf( precip_forecast, params["precip"] ) + # Adjust the mean of the forecast to match the observed mean elif params["probmatching_method"] == "mean": mask = precip_forecast >= params["precip_thr"] mu_fct = np.mean(precip_forecast[mask]) precip_forecast[mask] = precip_forecast[mask] - mu_fct + params["mu_0"] + # Update the mask for incremental method if params["mask_method"] == "incremental": state["mask_prec"][j] = nowcast_utils.compute_dilated_mask( precip_forecast >= params["precip_thr"], @@ -828,21 +1159,324 @@ def worker(j): params["mask_rim"], ) + # Apply the domain mask (set masked areas to NaN) precip_forecast[params["domain_mask"]] = np.nan - precip_forecast_out[j] = precip_forecast + return precip_forecast - if ( - DASK_IMPORTED - and params["n_ens_members"] > 1 - and params["num_ensemble_workers"] > 1 - ): - res = [] - for j in range(params["n_ens_members"]): - res.append(dask.delayed(worker)(j)) - dask.compute(*res, num_workers=params["num_ensemble_workers"]) - else: - for j in range(params["n_ens_members"]): - worker(j) - - return np.stack(precip_forecast_out), state + def __apply_precipitation_mask(self, precip_forecast, j, state, params): + """ + Apply the precipitation mask to prevent new precipitation from generating + in areas where it was not observed. + """ + precip_forecast_min = precip_forecast.min() + + if params["mask_method"] == "incremental": + precip_forecast = ( + precip_forecast_min + + (precip_forecast - precip_forecast_min) * state["mask_prec"][j] + ) + mask_prec_ = precip_forecast > precip_forecast_min + else: + mask_prec_ = state["mask_prec"] + + # Set to min value outside the mask + precip_forecast[~mask_prec_] = precip_forecast_min + + return precip_forecast + + def __measure_time(self, label, start_time): + """ + Measure and print the time taken for a specific part of the process. + + Parameters: + - label: A description of the part of the process being measured. + - start_time: The timestamp when the process started (from time.time()). + """ + if self.__config.measure_time: + elapsed_time = time.time() - start_time + print(f"{label} took {elapsed_time:.2f} seconds.") + + def reset_states_and_params(self): + """ + Reset the internal state and parameters of the nowcaster to allow multiple forecasts. + This method resets the state and params to their initial conditions without reinitializing + the inputs like precip, velocity, time_steps, or config. + """ + # Re-initialize the state and parameters + self.__state = StepsNowcasterState() + self.__params = StepsNowcasterParams() + + # Reset time measurement variables + self.__start_time_init = None + self.__init_time = None + self.__mainloop_time = None + + +# Wrapper function to preserve backward compatibility +@deprecate_args({"R": "precip", "V": "velocity", "R_thr": "precip_thr"}, "1.8.0") +def forecast( + precip, + velocity, + timesteps, + n_ens_members=24, + n_cascade_levels=6, + precip_thr=None, + kmperpixel=None, + timestep=None, + extrap_method="semilagrangian", + decomp_method="fft", + bandpass_filter_method="gaussian", + noise_method="nonparametric", + noise_stddev_adj=None, + ar_order=2, + vel_pert_method="bps", + conditional=False, + probmatching_method="cdf", + mask_method="incremental", + seed=None, + num_workers=1, + fft_method="numpy", + domain="spatial", + extrap_kwargs=None, + filter_kwargs=None, + noise_kwargs=None, + vel_pert_kwargs=None, + mask_kwargs=None, + measure_time=False, + callback=None, + return_output=True, +): + """ + Generate a nowcast ensemble by using the Short-Term Ensemble Prediction + System (STEPS) method. + + Parameters + ---------- + precip: array-like + Array of shape (ar_order+1,m,n) containing the input precipitation fields + ordered by timestamp from oldest to newest. The time steps between the + inputs are assumed to be regular. + velocity: array-like + Array of shape (2,m,n) containing the x- and y-components of the advection + field. The velocities are assumed to represent one time step between the + inputs. All values are required to be finite. + timesteps: int or list of floats + Number of time steps to forecast or a list of time steps for which the + forecasts are computed (relative to the input time step). The elements + of the list are required to be in ascending order. + n_ens_members: int, optional + The number of ensemble members to generate. + n_cascade_levels: int, optional + The number of cascade levels to use. Defaults to 6, see issue #385 + on GitHub. + precip_thr: float, optional + Specifies the threshold value for minimum observable precipitation + intensity. Required if mask_method is not None or conditional is True. + kmperpixel: float, optional + Spatial resolution of the input data (kilometers/pixel). Required if + vel_pert_method is not None or mask_method is 'incremental'. + timestep: float, optional + Time step of the motion vectors (minutes). Required if vel_pert_method is + not None or mask_method is 'incremental'. + extrap_method: str, optional + Name of the extrapolation method to use. See the documentation of + pysteps.extrapolation.interface. + decomp_method: {'fft'}, optional + Name of the cascade decomposition method to use. See the documentation + of pysteps.cascade.interface. + bandpass_filter_method: {'gaussian', 'uniform'}, optional + Name of the bandpass filter method to use with the cascade decomposition. + See the documentation of pysteps.cascade.interface. + noise_method: {'parametric','nonparametric','ssft','nested',None}, optional + Name of the noise generator to use for perturbating the precipitation + field. See the documentation of pysteps.noise.interface. If set to None, + no noise is generated. + noise_stddev_adj: {'auto','fixed',None}, optional + Optional adjustment for the standard deviations of the noise fields added + to each cascade level. This is done to compensate incorrect std. dev. + estimates of casace levels due to presence of no-rain areas. 'auto'=use + the method implemented in pysteps.noise.utils.compute_noise_stddev_adjs. + 'fixed'= use the formula given in :cite:`BPS2006` (eq. 6), None=disable + noise std. dev adjustment. + ar_order: int, optional + The order of the autoregressive model to use. Must be >= 1. + vel_pert_method: {'bps',None}, optional + Name of the noise generator to use for perturbing the advection field. See + the documentation of pysteps.noise.interface. If set to None, the advection + field is not perturbed. + conditional: bool, optional + If set to True, compute the statistics of the precipitation field + conditionally by excluding pixels where the values are below the + threshold precip_thr. + mask_method: {'obs','sprog','incremental',None}, optional + The method to use for masking no precipitation areas in the forecast + field. The masked pixels are set to the minimum value of the observations. + 'obs' = apply precip_thr to the most recently observed precipitation + intensity field, 'sprog' = use the smoothed forecast field from S-PROG, + where the AR(p) model has been applied, 'incremental' = iteratively + buffer the mask with a certain rate (currently it is 1 km/min), + None=no masking. + probmatching_method: {'cdf','mean',None}, optional + Method for matching the statistics of the forecast field with those of + the most recently observed one. 'cdf'=map the forecast CDF to the observed + one, 'mean'=adjust only the conditional mean value of the forecast field + in precipitation areas, None=no matching applied. Using 'mean' requires + that precip_thr and mask_method are not None. + seed: int, optional + Optional seed number for the random generators. + num_workers: int, optional + The number of workers to use for parallel computation. Applicable if dask + is enabled or pyFFTW is used for computing the FFT. When num_workers>1, it + is advisable to disable OpenMP by setting the environment variable + OMP_NUM_THREADS to 1. This avoids slowdown caused by too many simultaneous + threads. + fft_method: str, optional + A string defining the FFT method to use (see utils.fft.get_method). + Defaults to 'numpy' for compatibility reasons. If pyFFTW is installed, + the recommended method is 'pyfftw'. + domain: {"spatial", "spectral"} + If "spatial", all computations are done in the spatial domain (the + classical STEPS model). If "spectral", the AR(2) models and stochastic + perturbations are applied directly in the spectral domain to reduce + memory footprint and improve performance :cite:`PCH2019b`. + extrap_kwargs: dict, optional + Optional dictionary containing keyword arguments for the extrapolation + method. See the documentation of pysteps.extrapolation. + filter_kwargs: dict, optional + Optional dictionary containing keyword arguments for the filter method. + See the documentation of pysteps.cascade.bandpass_filters.py. + noise_kwargs: dict, optional + Optional dictionary containing keyword arguments for the initializer of + the noise generator. See the documentation of pysteps.noise.fftgenerators. + vel_pert_kwargs: dict, optional + Optional dictionary containing keyword arguments 'p_par' and 'p_perp' for + the initializer of the velocity perturbator. The choice of the optimal + parameters depends on the domain and the used optical flow method. + + Default parameters from :cite:`BPS2006`: + p_par = [10.88, 0.23, -7.68] + p_perp = [5.76, 0.31, -2.72] + + Parameters fitted to the data (optical flow/domain): + + darts/fmi: + p_par = [13.71259667, 0.15658963, -16.24368207] + p_perp = [8.26550355, 0.17820458, -9.54107834] + + darts/mch: + p_par = [24.27562298, 0.11297186, -27.30087471] + p_perp = [-7.80797846e+01, -3.38641048e-02, 7.56715304e+01] + + darts/fmi+mch: + p_par = [16.55447057, 0.14160448, -19.24613059] + p_perp = [14.75343395, 0.11785398, -16.26151612] + + lucaskanade/fmi: + p_par = [2.20837526, 0.33887032, -2.48995355] + p_perp = [2.21722634, 0.32359621, -2.57402761] + + lucaskanade/mch: + p_par = [2.56338484, 0.3330941, -2.99714349] + p_perp = [1.31204508, 0.3578426, -1.02499891] + + lucaskanade/fmi+mch: + p_par = [2.31970635, 0.33734287, -2.64972861] + p_perp = [1.90769947, 0.33446594, -2.06603662] + + vet/fmi: + p_par = [0.25337388, 0.67542291, 11.04895538] + p_perp = [0.02432118, 0.99613295, 7.40146505] + + vet/mch: + p_par = [0.5075159, 0.53895212, 7.90331791] + p_perp = [0.68025501, 0.41761289, 4.73793581] + + vet/fmi+mch: + p_par = [0.29495222, 0.62429207, 8.6804131 ] + p_perp = [0.23127377, 0.59010281, 5.98180004] + + fmi=Finland, mch=Switzerland, fmi+mch=both pooled into the same data set + + The above parameters have been fitten by using run_vel_pert_analysis.py + and fit_vel_pert_params.py located in the scripts directory. + + See pysteps.noise.motion for additional documentation. + mask_kwargs: dict + Optional dictionary containing mask keyword arguments 'mask_f' and + 'mask_rim', the factor defining the the mask increment and the rim size, + respectively. + The mask increment is defined as mask_f*timestep/kmperpixel. + measure_time: bool + If set to True, measure, print and return the computation time. + callback: function, optional + Optional function that is called after computation of each time step of + the nowcast. The function takes one argument: a three-dimensional array + of shape (n_ens_members,h,w), where h and w are the height and width + of the input precipitation fields, respectively. This can be used, for + instance, writing the outputs into files. + return_output: bool, optional + Set to False to disable returning the outputs as numpy arrays. This can + save memory if the intermediate results are written to output files using + the callback function. + + Returns + ------- + out: ndarray + If return_output is True, a four-dimensional array of shape + (n_ens_members,num_timesteps,m,n) containing a time series of forecast + precipitation fields for each ensemble member. Otherwise, a None value + is returned. The time series starts from t0+timestep, where timestep is + taken from the input precipitation fields. If measure_time is True, the + return value is a three-element tuple containing the nowcast array, the + initialization time of the nowcast generator and the time used in the + main loop (seconds). + + See also + -------- + pysteps.extrapolation.interface, pysteps.cascade.interface, + pysteps.noise.interface, pysteps.noise.utils.compute_noise_stddev_adjs + + References + ---------- + :cite:`Seed2003`, :cite:`BPS2006`, :cite:`SPN2013`, :cite:`PCH2019b` + """ + + nowcaster_config = StepsNowcasterConfig( + n_ens_members=n_ens_members, + n_cascade_levels=n_cascade_levels, + precip_threshold=precip_thr, + kmperpixel=kmperpixel, + timestep=timestep, + extrapolation_method=extrap_method, + decomposition_method=decomp_method, + bandpass_filter_method=bandpass_filter_method, + noise_method=noise_method, + noise_stddev_adj=noise_stddev_adj, + ar_order=ar_order, + velocity_perturbation_method=vel_pert_method, + conditional=conditional, + probmatching_method=probmatching_method, + mask_method=mask_method, + seed=seed, + num_workers=num_workers, + fft_method=fft_method, + domain=domain, + extrapolation_kwargs=extrap_kwargs, + filter_kwargs=filter_kwargs, + noise_kwargs=noise_kwargs, + velocity_perturbation_kwargs=vel_pert_kwargs, + mask_kwargs=mask_kwargs, + measure_time=measure_time, + callback=callback, + return_output=return_output, + ) + + # Create an instance of the new class with all the provided arguments + nowcaster = StepsNowcaster( + precip, velocity, timesteps, steps_config=nowcaster_config + ) + forecast_steps_nowcast = nowcaster.compute_forecast() + nowcaster.reset_states_and_params() + # Call the appropriate methods within the class + return forecast_steps_nowcast From 50839ff4f20fc23e86a3c21cb70db495ef95bac4 Mon Sep 17 00:00:00 2001 From: Seppo Pulkkinen Date: Wed, 4 Dec 2024 21:42:24 +0200 Subject: [PATCH 2/3] Importers for new OPERA products (#441) * Fix incorrect function name * Skip reading attributes from what group if not found at main level * Add tests for OPERA NIMBUS and CIRRUS products * Add tests for NIMBUS rain accumulation composites * Make quantity attribute in what group optional * Revise function docstrings --------- Co-authored-by: Seppo Pulkkinen --- pysteps/io/importers.py | 13 ++- pysteps/tests/test_io_opera_hdf5.py | 148 ++++++++++++++++++++++++++-- 2 files changed, 148 insertions(+), 13 deletions(-) diff --git a/pysteps/io/importers.py b/pysteps/io/importers.py index 1493b6d4b..9cfaa17f9 100644 --- a/pysteps/io/importers.py +++ b/pysteps/io/importers.py @@ -1304,10 +1304,13 @@ def import_odim_hdf5(filename, qty="RATE", **kwargs): # check if the "what" group is in the "dataset" group if "what" in list(dsg[1].keys()): if "quantity" in dsg[1]["what"].attrs.keys(): - qty_, gain, offset, nodata, undetect = _read_opera_hdf5_what_group( - dsg[1]["what"] - ) - what_grp_found = True + try: + qty_, gain, offset, nodata, undetect = ( + _read_opera_hdf5_what_group(dsg[1]["what"]) + ) + what_grp_found = True + except KeyError: + pass for dg in dsg[1].items(): if dg[0][0:4] == "data": @@ -1480,7 +1483,7 @@ def import_opera_hdf5(filename, qty="RATE", **kwargs): def _read_opera_hdf5_what_group(whatgrp): - qty = whatgrp.attrs["quantity"] + qty = whatgrp.attrs["quantity"] if "quantity" in whatgrp.attrs.keys() else b"QIND" gain = whatgrp.attrs["gain"] if "gain" in whatgrp.attrs.keys() else 1.0 offset = whatgrp.attrs["offset"] if "offset" in whatgrp.attrs.keys() else 0.0 nodata = whatgrp.attrs["nodata"] if "nodata" in whatgrp.attrs.keys() else np.nan diff --git a/pysteps/tests/test_io_opera_hdf5.py b/pysteps/tests/test_io_opera_hdf5.py index 6b2214d49..52f86f7b8 100644 --- a/pysteps/tests/test_io_opera_hdf5.py +++ b/pysteps/tests/test_io_opera_hdf5.py @@ -9,19 +9,57 @@ pytest.importorskip("h5py") +# tests for three OPERA products: +# Odyssey rain rate composite (production discontinued on October 30th 2024) +# CIRRUS max. reflectivity composites +# NIMBUS rain rate composites root_path = pysteps.rcparams.data_sources["opera"]["root_path"] + filename = os.path.join(root_path, "20180824", "T_PAAH21_C_EUOC_20180824180000.hdf") -precip, _, metadata = pysteps.io.import_opera_hdf5(filename) +precip_odyssey, _, metadata_odyssey = pysteps.io.import_opera_hdf5(filename, qty="RATE") + +filename = os.path.join( + root_path, "20241126", "CIRRUS", "T_PABV21_C_EUOC_20241126010000.hdf" +) +precip_cirrus, _, metadata_cirrus = pysteps.io.import_opera_hdf5(filename, qty="DBZH") + +filename = os.path.join( + root_path, "20241126", "NIMBUS", "T_PAAH22_C_EUOC_20241126010000.hdf" +) +precip_nimbus_rain_rate, _, metadata_nimbus_rain_rate = pysteps.io.import_opera_hdf5( + filename, qty="RATE" +) + +filename = os.path.join( + root_path, "20241126", "NIMBUS", "T_PASH22_C_EUOC_20241126010000.hdf" +) +precip_nimbus_rain_accum, _, metadata_nimbus_rain_accum = pysteps.io.import_opera_hdf5( + filename, qty="ACRR" +) -def test_io_import_opera_hdf5_shape(): +def test_io_import_opera_hdf5_odyssey_shape(): """Test the importer OPERA HDF5.""" - assert precip.shape == (2200, 1900) + assert precip_odyssey.shape == (2200, 1900) -# test_metadata: list of (variable,expected, tolerance) tuples +def test_io_import_opera_hdf5_cirrus_shape(): + """Test the importer OPERA HDF5.""" + assert precip_cirrus.shape == (4400, 3800) + +def test_io_import_opera_hdf5_nimbus_rain_rate_shape(): + """Test the importer OPERA HDF5.""" + assert precip_nimbus_rain_rate.shape == (2200, 1900) + + +def test_io_import_opera_hdf5_nimbus_rain_accum_shape(): + """Test the importer OPERA HDF5.""" + assert precip_nimbus_rain_accum.shape == (2200, 1900) + + +# test_metadata: list of (variable,expected, tolerance) tuples expected_proj = ( "+proj=laea +lat_0=55.0 +lon_0=10.0 " "+x_0=1950000.0 " @@ -30,7 +68,7 @@ def test_io_import_opera_hdf5_shape(): ) # list of (variable,expected,tolerance) tuples -test_attrs = [ +test_odyssey_attrs = [ ("projection", expected_proj, None), ("ll_lon", -10.434576838640398, 1e-10), ("ll_lat", 31.746215319325056, 1e-10), @@ -53,7 +91,101 @@ def test_io_import_opera_hdf5_shape(): ] -@pytest.mark.parametrize("variable, expected, tolerance", test_attrs) -def test_io_import_mch_gif_dataset_attrs(variable, expected, tolerance): +@pytest.mark.parametrize("variable, expected, tolerance", test_odyssey_attrs) +def test_io_import_opera_hdf5_odyssey_dataset_attrs(variable, expected, tolerance): """Test the importer OPERA HDF5.""" - smart_assert(metadata[variable], expected, tolerance) + smart_assert(metadata_odyssey[variable], expected, tolerance) + + +# list of (variable,expected,tolerance) tuples +test_cirrus_attrs = [ + ("projection", expected_proj, None), + ("ll_lon", -10.4345768386404, 1e-10), + ("ll_lat", 31.7462153182675, 1e-10), + ("ur_lon", 57.8119647501499, 1e-10), + ("ur_lat", 67.6210371071631, 1e-10), + ("x1", -0.00027143326587975025, 1e-6), + ("y1", -4400000.00116988, 1e-10), + ("x2", 3800000.0000817003, 1e-10), + ("y2", -8.761277422308922e-05, 1e-6), + ("xpixelsize", 1000.0, 1e-10), + ("ypixelsize", 1000.0, 1e-10), + ("cartesian_unit", "m", None), + ("accutime", 15.0, 1e-10), + ("yorigin", "upper", None), + ("unit", "dBZ", None), + ("institution", "Odyssey datacentre", None), + ("transform", "dB", None), + ("zerovalue", -32.0, 1e-10), + ("threshold", -31.5, 1e-10), +] + + +@pytest.mark.parametrize("variable, expected, tolerance", test_cirrus_attrs) +def test_io_import_opera_hdf5_cirrus_dataset_attrs(variable, expected, tolerance): + """Test OPERA HDF5 importer: max. reflectivity composites from CIRRUS.""" + smart_assert(metadata_cirrus[variable], expected, tolerance) + + +# list of (variable,expected,tolerance) tuples +test_nimbus_rain_rate_attrs = [ + ("projection", expected_proj, None), + ("ll_lon", -10.434599999137568, 1e-10), + ("ll_lat", 31.74619995126678, 1e-10), + ("ur_lon", 57.8119032106317, 1e-10), + ("ur_lat", 67.62104536996274, 1e-10), + ("x1", -2.5302714337594807, 1e-6), + ("y1", -4400001.031169886, 1e-10), + ("x2", 3799997.4700817037, 1e-10), + ("y2", -1.0300876162946224, 1e-6), + ("xpixelsize", 2000.0, 1e-10), + ("ypixelsize", 2000.0, 1e-10), + ("cartesian_unit", "m", None), + ("accutime", 15.0, 1e-10), + ("yorigin", "upper", None), + ("unit", "mm/h", None), + ("institution", "Odyssey datacentre", None), + ("transform", None, None), + ("zerovalue", 0.0, 1e-10), + ("threshold", 0.01, 1e-10), +] + + +@pytest.mark.parametrize("variable, expected, tolerance", test_nimbus_rain_rate_attrs) +def test_io_import_opera_hdf5_nimbus_rain_rate_dataset_attrs( + variable, expected, tolerance +): + """Test OPERA HDF5 importer: rain rate composites from NIMBUS.""" + smart_assert(metadata_nimbus_rain_rate[variable], expected, tolerance) + + +# list of (variable,expected,tolerance) tuples +test_nimbus_rain_accum_attrs = [ + ("projection", expected_proj, None), + ("ll_lon", -10.434599999137568, 1e-10), + ("ll_lat", 31.74619995126678, 1e-10), + ("ur_lon", 57.8119032106317, 1e-10), + ("ur_lat", 67.62104536996274, 1e-10), + ("x1", -2.5302714337594807, 1e-6), + ("y1", -4400001.031169886, 1e-10), + ("x2", 3799997.4700817037, 1e-10), + ("y2", -1.0300876162946224, 1e-6), + ("xpixelsize", 2000.0, 1e-10), + ("ypixelsize", 2000.0, 1e-10), + ("cartesian_unit", "m", None), + ("accutime", 15.0, 1e-10), + ("yorigin", "upper", None), + ("unit", "mm", None), + ("institution", "Odyssey datacentre", None), + ("transform", None, None), + ("zerovalue", 0.0, 1e-10), + ("threshold", 0.01, 1e-10), +] + + +@pytest.mark.parametrize("variable, expected, tolerance", test_nimbus_rain_accum_attrs) +def test_io_import_opera_hdf5_nimbus_rain_accum_dataset_attrs( + variable, expected, tolerance +): + """Test OPERA HDF5 importer: rain accumulation composites from NIMBUS.""" + smart_assert(metadata_nimbus_rain_accum[variable], expected, tolerance) From e3325854dae006b1e78eb43a6d2203f2b0b71560 Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Wed, 4 Dec 2024 20:45:15 +0100 Subject: [PATCH 3/3] Bump version --- PKG-INFO | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/PKG-INFO b/PKG-INFO index 1951fa8f5..3b95650fd 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: pysteps -Version: 1.12.0 +Version: 1.13.0 Summary: Python framework for short-term ensemble prediction systems Home-page: http://pypi.python.org/pypi/pysteps/ License: LICENSE diff --git a/setup.py b/setup.py index 09ed608e2..569991919 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ setup( name="pysteps", - version="1.12.0", + version="1.13.0", author="PySteps developers", packages=find_packages(), license="LICENSE",