From 366b9ccc34faf3065a7bf8eeb5b73c74e11d64cb Mon Sep 17 00:00:00 2001 From: Damien Delforge Date: Sat, 9 Nov 2024 18:38:56 +0100 Subject: [PATCH] Add AR model functionality and tests Integrated autoregressive (AR) model functions: scoring, fitting, and surrogate generation. Enhanced the test suite to include comprehensive tests for AR models and updated requirements.txt with joblib and statsmodels dependencies. --- requirements.txt | 2 + tests/conftest.py | 28 +++++- tests/test_math.py | 201 +++++++++++++++++++++++++++++++++++++- vassal/math_ext.py | 233 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 458 insertions(+), 6 deletions(-) diff --git a/requirements.txt b/requirements.txt index 810e642..72676cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,9 @@ +joblib numpy matplotlib pandas scipy scikit-learn +statsmodels # Optional dependencies # dask # for dask available svd methods diff --git a/tests/conftest.py b/tests/conftest.py index d25c9fb..092d5bb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,40 +1,61 @@ -import pytest import numpy as np +import pytest + from vassal.ssa import SingularSpectrumAnalysis + @pytest.fixture def timeseries50(): return np.random.rand(50) + + +@pytest.fixture +def ar1_timeseries50(): + np.random.seed(42) + n = 50 + b = 0.5 + ar1_series = [0] + for _ in range(1, n): + ar1_series.append(b * ar1_series[-1] + np.random.normal()) + return np.array(ar1_series) + + @pytest.fixture def ssa_no_decomposition(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50) return ssa + @pytest.fixture def ssa_np_svd(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='np_svd') return ssa + @pytest.fixture def ssa_sc_svd(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='sc_svd') return ssa + @pytest.fixture def ssa_sc_ssvd(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='sc_ssvd') return ssa + @pytest.fixture def ssa_sk_rsvd(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='sk_rsvd') return ssa + @pytest.fixture def ssa_da_svd(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='da_svd') return ssa + @pytest.fixture def ssa_da_csvd(timeseries50): ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='da_csvd') @@ -47,8 +68,9 @@ def ssa_with_decomposition(timeseries50): ssa.decompose(n_components=10) return ssa + @pytest.fixture def ssa_with_reconstruction(ssa_with_decomposition): ssa = ssa_with_decomposition - ssa.reconstruct(groups={'group1': [0,1,2]}) - return ssa \ No newline at end of file + ssa.reconstruct(groups={'group1': [0, 1, 2]}) + return ssa diff --git a/tests/test_math.py b/tests/test_math.py index c14ceeb..3498973 100644 --- a/tests/test_math.py +++ b/tests/test_math.py @@ -7,7 +7,10 @@ construct_SVD_matrix, construct_BK_trajectory_matrix, construct_VG_covariance_matrix, - average_antidiagonals + average_antidiagonals, + autoregressive_model_score, + fit_autoregressive_model, + generate_autoregressive_surrogate ) @@ -16,8 +19,9 @@ def test_correlation_weights(): assert weights.shape[0] == 10 assert np.isfinite(weights).all() + def test_weighted_correlation_matrix(): - series = np.random.randn(5, 10) # 5 timeseries of length 10 + series = np.random.randn(5, 10) # 5 timeseries of length 10 weights = np.ones(10) wcorr = weighted_correlation_matrix(series, weights) assert wcorr.shape == (5, 5) @@ -32,6 +36,7 @@ def test_weighted_correlation_matrix(): assert np.allclose(wcorr, wcorr.T) assert (wcorr >= -1).all() and (wcorr <= 1).all() + def test_construct_SVD_matrix_BK(): ts = np.array([1, 3, 0, -3, -2, -1]) expected_matrix = np.array([ @@ -43,6 +48,7 @@ def test_construct_SVD_matrix_BK(): result = construct_SVD_matrix(ts, window=4, kind='BK') np.testing.assert_array_almost_equal(result, expected_matrix) + def test_construct_SVD_matrix_VG(): ts = np.array([1, 3, 0, -3, -2, -1]) expected_matrix = np.array([ @@ -53,6 +59,7 @@ def test_construct_SVD_matrix_VG(): result = construct_SVD_matrix(ts, window=3, kind='VG') np.testing.assert_array_almost_equal(result, expected_matrix) + def test_construct_BK_trajectory_matrix(): ts = np.array([1, 3, 0, -3, -2, -1]) expected_matrix = np.array([ @@ -64,6 +71,7 @@ def test_construct_BK_trajectory_matrix(): result = construct_BK_trajectory_matrix(ts, window=4) np.testing.assert_array_almost_equal(result, expected_matrix) + def test_construct_VG_covariance_matrix(): ts = np.array([1, 3, 0, -3, -2, -1]) expected_matrix = np.array([ @@ -74,6 +82,7 @@ def test_construct_VG_covariance_matrix(): result = construct_VG_covariance_matrix(ts, window=3) np.testing.assert_array_almost_equal(result, expected_matrix) + def test_construct_SVD_matrix_invalid_kind(): ts = np.array([1, 3, 0, -3, -2, -1]) with pytest.raises( @@ -82,6 +91,7 @@ def test_construct_SVD_matrix_invalid_kind(): "'BK_covariance', or 'VG_covariance'"): construct_SVD_matrix(ts, window=3, kind='invalid') + def test_construct_SVD_matrix_default_window(): ts = np.array([1, 2, 3, 4, 5, 6]) expected_matrix = np.array([ @@ -92,12 +102,14 @@ def test_construct_SVD_matrix_default_window(): result = construct_SVD_matrix(ts, kind='BK') np.testing.assert_array_almost_equal(result, expected_matrix) + def test_average_antidiagonals(): matrix = np.arange(12).reshape(4, 3) expected_timeseries = np.array([0., 2., 4., 7., 9., 11.]) result = average_antidiagonals(matrix) np.testing.assert_array_almost_equal(result, expected_timeseries) + def test_average_antidiagonals_square_matrix(): matrix = np.array([ [1, 2, 3], @@ -108,21 +120,206 @@ def test_average_antidiagonals_square_matrix(): result = average_antidiagonals(matrix) np.testing.assert_array_almost_equal(result, expected_timeseries) + def test_average_antidiagonals_single_row(): matrix = np.array([[1, 2, 3]]) expected_timeseries = np.array([1., 2., 3.]) result = average_antidiagonals(matrix) np.testing.assert_array_almost_equal(result, expected_timeseries) + def test_average_antidiagonals_single_column(): matrix = np.array([[1], [2], [3]]) expected_timeseries = np.array([1., 2., 3.]) result = average_antidiagonals(matrix) np.testing.assert_array_almost_equal(result, expected_timeseries) + def test_average_antidiagonals_empty(): matrix = np.array([[]]) expected_timeseries = np.array([]) result = average_antidiagonals(matrix) np.testing.assert_array_almost_equal(result, expected_timeseries) + +def test_autoregressive_model_score_valid_ar0(ar1_timeseries50): + timeseries = ar1_timeseries50 + order = 0 # white noise should work as well + score = autoregressive_model_score(timeseries, order) + assert isinstance(score, float) + + +def test_autoregressive_model_score_valid_bic(ar1_timeseries50): + timeseries = ar1_timeseries50 + order = 1 + score = autoregressive_model_score(timeseries, order, criterion='bic') + assert isinstance(score, float) + + +def test_autoregressive_model_score_valid_aic(ar1_timeseries50): + timeseries = ar1_timeseries50 + order = 1 + score = autoregressive_model_score(timeseries, order, criterion='aic') + assert isinstance(score, float) + + +def test_autoregressive_model_score_empty_timeseries(): + timeseries = [] + order = 1 + with pytest.raises(ValueError, match="timeseries cannot be empty"): + autoregressive_model_score(timeseries, order) + + +def test_autoregressive_model_score_negative_order(ar1_timeseries50): + timeseries = ar1_timeseries50 + order = -1 + with pytest.raises(ValueError, match="order must be non-negative"): + autoregressive_model_score(timeseries, order) + + +def test_autoregressive_model_score_order_greater_than_length(): + timeseries = [1.0, 2.0, 3.0, 4.0, 5.0] + order = 10 + with pytest.raises(ValueError, + match="timeseries length must be greater than order"): + autoregressive_model_score(timeseries, order) + + +def test_fit_autoregressive_model_default(ar1_timeseries50): + timeseries = ar1_timeseries50 + model = fit_autoregressive_model(timeseries) + expected_params = np.array([0.599205, 0.827014]) + np.testing.assert_allclose(model.params, expected_params, atol=1e-5) + + +def test_fit_autoregressive_model_aic(ar1_timeseries50): + timeseries = ar1_timeseries50 + model = fit_autoregressive_model(timeseries, criterion='aic') + assert hasattr(model, 'aic') + assert hasattr(model, 'bic') + + +def test_fit_autoregressive_model_specified_max_order(ar1_timeseries50): + timeseries = ar1_timeseries50 + model = fit_autoregressive_model(timeseries, max_order=5) + assert hasattr(model, 'aic') + assert hasattr(model, 'bic') + + +def test_fit_autoregressive_model_empty_timeseries(): + timeseries = [] + with pytest.raises(ValueError, + match="timeseries length must be greater than max_order"): + fit_autoregressive_model(timeseries) + + +def test_fit_autoregressive_model_max_order_greater_than_length(): + timeseries = [1.0, 2.0, 3.0, 4.0, 5.0] + with pytest.raises(ValueError, + match="timeseries length must be greater than max_order"): + fit_autoregressive_model(timeseries, max_order=10) + + +def test_fit_autoregressive_model_negative_max_order(ar1_timeseries50): + timeseries = ar1_timeseries50 + with pytest.raises(ValueError, match="max_order must be non-negative"): + fit_autoregressive_model(timeseries, max_order=-1) + + +def test_fit_autoregressive_model_non_integer_max_order(ar1_timeseries50): + timeseries = ar1_timeseries50 + with pytest.raises(TypeError, match="max_order must be an integer"): + fit_autoregressive_model(timeseries, max_order=1.5) + + +def test_fit_autoregressive_model_parallel_jobs(ar1_timeseries50): + timeseries = ar1_timeseries50 + model = fit_autoregressive_model(timeseries, n_jobs=2) + assert hasattr(model, 'aic') + assert hasattr(model, 'bic') + + +def test_generate_ar_surrogate_valid_parameters(): + ar_coefficients = [1, -0.5] + n_samples = 100 + scale = 1.0 + surrogate = generate_autoregressive_surrogate(ar_coefficients, n_samples, + scale) + assert isinstance(surrogate, np.ndarray) + assert len(surrogate) == n_samples + model = fit_autoregressive_model(surrogate) + + np.testing.assert_allclose(-model.params[0], ar_coefficients[-1], atol=1e-1) + np.testing.assert_allclose(model.params[-1], scale, atol=1e-1) + +def test_generate_ar_surrogate_with_seed(): + ar_coefficients = [1, -0.5] + n_samples = 10 + scale = 1.0 + seed = 42 + surrogate_1 = generate_autoregressive_surrogate(ar_coefficients, n_samples, + scale, seed) + surrogate_2 = generate_autoregressive_surrogate(ar_coefficients, n_samples, + scale, seed) + assert np.array_equal(surrogate_1, surrogate_2) + + +def test_generate_ar_surrogate_zero_samples(): + ar_coefficients = [1, -0.5] + n_samples = 0 + scale = 1.0 + with pytest.raises(ValueError, match="n_samples must be positive"): + generate_autoregressive_surrogate(ar_coefficients, n_samples, scale) + + +def test_generate_ar_surrogate_negative_samples(): + ar_coefficients = [1, -0.5] + n_samples = -10 + scale = 1.0 + with pytest.raises(ValueError, match="n_samples must be positive"): + generate_autoregressive_surrogate(ar_coefficients, n_samples, scale) + + +def test_generate_ar_surrogate_zero_scale(): + ar_coefficients = [1, -0.5] + n_samples = 100 + scale = 0.0 + with pytest.raises(ValueError, match="scale must be positive"): + generate_autoregressive_surrogate(ar_coefficients, n_samples, scale) + + +def test_generate_ar_surrogate_negative_scale(): + ar_coefficients = [1, -0.5] + n_samples = 100 + scale = -1.0 + with pytest.raises(ValueError, match="scale must be positive"): + generate_autoregressive_surrogate(ar_coefficients, n_samples, scale) + + +def test_generate_ar_surrogate_empty_ar_coefficients(): + ar_coefficients = [] + n_samples = 100 + scale = 1.0 + with pytest.raises(ValueError, match="ar_coefficients must not be empty"): + generate_autoregressive_surrogate(ar_coefficients, n_samples, scale) + + +def test_generate_ar_surrogate_first_coeff_not_one(): + ar_coefficients = [0.5, -0.5] + n_samples = 100 + scale = 1.0 + with pytest.raises(ValueError, match="First AR coefficient must be 1"): + generate_autoregressive_surrogate(ar_coefficients, n_samples, scale) + + +def test_generate_ar_surrogate_burnin_effect(): + ar_coefficients = [1, -0.5] + n_samples = 100 + scale = 1.0 + seed = 42 + surrogate_without_burnin = generate_autoregressive_surrogate( + ar_coefficients, n_samples, scale, seed, burnin=0) + surrogate_with_burnin = generate_autoregressive_surrogate(ar_coefficients, + n_samples, scale, + seed, burnin=100) + assert not np.array_equal(surrogate_without_burnin, surrogate_with_burnin) diff --git a/vassal/math_ext.py b/vassal/math_ext.py index 3beef11..7a8226b 100644 --- a/vassal/math_ext.py +++ b/vassal/math_ext.py @@ -5,10 +5,14 @@ # # License: BSD 3 clause -from typing import Sequence +from typing import Literal, Sequence import numpy as np +import statsmodels.api as sm +from joblib import Parallel, delayed from scipy.linalg import toeplitz +from statsmodels.tsa.arima_process import arma_generate_sample +from statsmodels.tsa.statespace.sarimax import SARIMAXResults def correlation_weights( @@ -299,6 +303,233 @@ def average_antidiagonals(matrix: np.ndarray) -> np.ndarray: return timeseries +def autoregressive_model_score( + timeseries: Sequence[float], + order: int, + criterion: Literal['aic', 'bic'] = 'bic', +): + """ + Compute the information criterion score for an autoregressive model. + + This function fits an autoregressive (AR) model of a specified order to the + given time series data and computes the selected information criterion + score ('aic' or 'bic'). The AR model is fitted using the `SARIMAX` class + from the `statsmodels` library, with no differencing or moving average + components. + + Parameters + ---------- + timeseries : Sequence[float] + The time series data to which the autoregressive model is to be fitted. + order : int + The order of the autoregressive model. + criterion : {'aic', 'bic'}, optional + The information criterion used to evaluate the model. Either 'aic' + (Akaike Information Criterion) or 'bic' (Bayesian Information Criterion). + Default is 'bic'. + + Returns + ------- + score : float + The computed information criterion score for the fitted autoregressive + model. + + References + ---------- + .. [1] "statsmodels.tsa.statespace.sarimax.SARIMAX" `statsmodels` documentation. + https://www.statsmodels.org/devel/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html + """ + # Input validation + if len(timeseries) == 0: # Check for empty sequence + raise ValueError("timeseries cannot be empty") + if order < 0: + raise ValueError("order must be non-negative") + if len(timeseries) <= order: + raise ValueError("timeseries length must be greater than order") + + + arp = sm.tsa.statespace.SARIMAX( + timeseries, + order=(order, 0, 0), + trend=None + ).fit() + + if criterion == 'bic': + score = arp.bic + elif criterion == 'aic': + score = arp.aic + + return score + + +def fit_autoregressive_model( + timeseries: Sequence[float], + max_order: int = 1, + criterion: Literal['aic', 'bic'] = 'bic', + n_jobs: int | None = None, +) -> SARIMAXResults: + """ + Fits an autoregressive model to the given time series data using the + specified criterion to select the best order. + + This function evaluates autoregressive models of orders ranging from 0 to + `max_order` and selects the model that minimizes the specified information + criterion ('aic' or 'bic'). + + The fitting process can be parallelized across multiple CPU cores if + `n_jobs` is specified. + + Parameters + ---------- + timeseries : Sequence[float] + The time series data to which the autoregressive model is to be fitted. + max_order : int, optional + The maximum order of the autoregressive model to be evaluated. Default + is 1. + criterion : {'aic', 'bic'}, optional + The information criterion used to select the best model. Default is + 'bic'. + n_jobs : int or None, optional + The number of CPU cores to use for parallel processing. If None, all + available cores are used. If -1, also uses all available cores. Default + is None. + + Returns + ------- + autoregressive_model : statsmodels.tsa.statespace.sarimax.SARIMAXResults + The fitted SARIMAX model object from the `statsmodels` library. + + References + ---------- + .. [1] "statsmodels.tsa.statespace.sarimax.SARIMAX" `statsmodels` documentation. + https://www.statsmodels.org/devel/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html + + """ + + if max_order < 0: + raise ValueError("max_order must be non-negative") + if not isinstance(max_order, int): + raise TypeError("max_order must be an integer") + if len(timeseries) <= max_order: + raise ValueError("timeseries length must be greater than max_order") + + if n_jobs is None: + n_jobs = -1 + order_list = list(range(max_order + 1)) + model_scores = Parallel(n_jobs=n_jobs)( + delayed(autoregressive_model_score)( + timeseries, + order, + criterion + ) for order in order_list + ) + best_order = order_list[np.argmin(model_scores)] + autoregressive_model = sm.tsa.statespace.SARIMAX( + timeseries, + order=(best_order, 0, 0), + trend=None + ).fit() + + return autoregressive_model + + +def generate_autoregressive_surrogate( + ar_coefficients: Sequence[float], + n_samples: int, + scale: float, + seed: int | None = None, + burnin: int = 100 +): + """ + Generate a surrogate time series using an autoregressive (AR) process. + + This function generates an autoregressive surrogate time series based on + specified AR coefficients. + + Parameters + ---------- + ar_coefficients : Sequence[float] + The coefficient for autoregressive lag polynomial, including zero lag. + n_samples : int + The number of samples to generate in the surrogate time series. + scale : float + The standard deviation of the white noise component added to the AR model. + seed : int or None, optional + Random seed for reproducibility. If `None`, the random number generator + is not seeded. + burnin : int + Number of initial samples to discard to reduce the effect of initial + conditions. Default is 100. + + Raises + ------ + ValueError + If max_order is negative or greater than the length of timeseries + TypeError + If max_order is not an integer + + Returns + ------- + np.ndarray + An array containing the generated autoregressive surrogate time series. + + Notes + ----- + + - The function uses `statsmodels.tsa.arima_process.arma_generate_sample` + [1]_ to generate the AR time series. + - As noted in [1]_, the AR components should include the coefficient on the + zero-lag. This is typically 1. Further, the AR parameters should have the + opposite sign of what you might expect. See the examples below. + - Standardizing the generated series helps in preventing any scale-related + issues in further analysis. + - The function sets a burn-in period of 100 samples to mitigate the + influence of initial conditions. + + References + ---------- + .. [1] "ARMA Process." `statsmodels` documentation. + https://www.statsmodels.org/devel/generated/statsmodels.tsa.arima_process.arma_generate_sample.html + + Examples + -------- + + >>> ar_coefficients = [1, -0.9] + >>> n_samples = 5 + >>> scale = 1.0 + >>> seed = 42 + >>> surrogate = generate_autoregressive_surrogate(ar_coefficients, n_samples, scale, seed) + >>> print(surrogate) + [ 1.28271453 0.6648326 0.36050652 -1.39807629 -0.90997736] + + Raises + ------ + ValueError + If n_samples or scale is not positive, or if ar_coefficients is empty + or doesn't start with 1. + """ + if n_samples <= 0: + raise ValueError("n_samples must be positive") + if scale <= 0: + raise ValueError("scale must be positive") + if not ar_coefficients: + raise ValueError("ar_coefficients must not be empty") + if ar_coefficients[0] != 1: + raise ValueError("First AR coefficient must be 1") + + if seed is not None: + np.random.seed(seed) + surrogate = arma_generate_sample( + ar_coefficients, + [1], + n_samples, + scale=scale, + burnin=burnin + ) + + return surrogate + + if __name__ == '__main__': import doctest