Skip to content

Commit

Permalink
Add AR model functionality and tests
Browse files Browse the repository at this point in the history
Integrated autoregressive (AR) model functions: scoring, fitting, and surrogate generation. Enhanced the test suite to include comprehensive tests for AR models and updated requirements.txt with joblib and statsmodels dependencies.
  • Loading branch information
dadelforge committed Nov 9, 2024
1 parent 8c6d7fd commit 366b9cc
Show file tree
Hide file tree
Showing 4 changed files with 458 additions and 6 deletions.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
joblib
numpy
matplotlib
pandas
scipy
scikit-learn
statsmodels
# Optional dependencies
# dask # for dask available svd methods
28 changes: 25 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,61 @@
import pytest
import numpy as np
import pytest

from vassal.ssa import SingularSpectrumAnalysis


@pytest.fixture
def timeseries50():
return np.random.rand(50)


@pytest.fixture
def ar1_timeseries50():
np.random.seed(42)
n = 50
b = 0.5
ar1_series = [0]
for _ in range(1, n):
ar1_series.append(b * ar1_series[-1] + np.random.normal())
return np.array(ar1_series)


@pytest.fixture
def ssa_no_decomposition(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50)
return ssa


@pytest.fixture
def ssa_np_svd(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='np_svd')
return ssa


@pytest.fixture
def ssa_sc_svd(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='sc_svd')
return ssa


@pytest.fixture
def ssa_sc_ssvd(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='sc_ssvd')
return ssa


@pytest.fixture
def ssa_sk_rsvd(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='sk_rsvd')
return ssa


@pytest.fixture
def ssa_da_svd(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='da_svd')
return ssa


@pytest.fixture
def ssa_da_csvd(timeseries50):
ssa = SingularSpectrumAnalysis(timeseries50, svd_solver='da_csvd')
Expand All @@ -47,8 +68,9 @@ def ssa_with_decomposition(timeseries50):
ssa.decompose(n_components=10)
return ssa


@pytest.fixture
def ssa_with_reconstruction(ssa_with_decomposition):
ssa = ssa_with_decomposition
ssa.reconstruct(groups={'group1': [0,1,2]})
return ssa
ssa.reconstruct(groups={'group1': [0, 1, 2]})
return ssa
201 changes: 199 additions & 2 deletions tests/test_math.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
construct_SVD_matrix,
construct_BK_trajectory_matrix,
construct_VG_covariance_matrix,
average_antidiagonals
average_antidiagonals,
autoregressive_model_score,
fit_autoregressive_model,
generate_autoregressive_surrogate
)


Expand All @@ -16,8 +19,9 @@ def test_correlation_weights():
assert weights.shape[0] == 10
assert np.isfinite(weights).all()


def test_weighted_correlation_matrix():
series = np.random.randn(5, 10) # 5 timeseries of length 10
series = np.random.randn(5, 10) # 5 timeseries of length 10
weights = np.ones(10)
wcorr = weighted_correlation_matrix(series, weights)
assert wcorr.shape == (5, 5)
Expand All @@ -32,6 +36,7 @@ def test_weighted_correlation_matrix():
assert np.allclose(wcorr, wcorr.T)
assert (wcorr >= -1).all() and (wcorr <= 1).all()


def test_construct_SVD_matrix_BK():
ts = np.array([1, 3, 0, -3, -2, -1])
expected_matrix = np.array([
Expand All @@ -43,6 +48,7 @@ def test_construct_SVD_matrix_BK():
result = construct_SVD_matrix(ts, window=4, kind='BK')
np.testing.assert_array_almost_equal(result, expected_matrix)


def test_construct_SVD_matrix_VG():
ts = np.array([1, 3, 0, -3, -2, -1])
expected_matrix = np.array([
Expand All @@ -53,6 +59,7 @@ def test_construct_SVD_matrix_VG():
result = construct_SVD_matrix(ts, window=3, kind='VG')
np.testing.assert_array_almost_equal(result, expected_matrix)


def test_construct_BK_trajectory_matrix():
ts = np.array([1, 3, 0, -3, -2, -1])
expected_matrix = np.array([
Expand All @@ -64,6 +71,7 @@ def test_construct_BK_trajectory_matrix():
result = construct_BK_trajectory_matrix(ts, window=4)
np.testing.assert_array_almost_equal(result, expected_matrix)


def test_construct_VG_covariance_matrix():
ts = np.array([1, 3, 0, -3, -2, -1])
expected_matrix = np.array([
Expand All @@ -74,6 +82,7 @@ def test_construct_VG_covariance_matrix():
result = construct_VG_covariance_matrix(ts, window=3)
np.testing.assert_array_almost_equal(result, expected_matrix)


def test_construct_SVD_matrix_invalid_kind():
ts = np.array([1, 3, 0, -3, -2, -1])
with pytest.raises(
Expand All @@ -82,6 +91,7 @@ def test_construct_SVD_matrix_invalid_kind():
"'BK_covariance', or 'VG_covariance'"):
construct_SVD_matrix(ts, window=3, kind='invalid')


def test_construct_SVD_matrix_default_window():
ts = np.array([1, 2, 3, 4, 5, 6])
expected_matrix = np.array([
Expand All @@ -92,12 +102,14 @@ def test_construct_SVD_matrix_default_window():
result = construct_SVD_matrix(ts, kind='BK')
np.testing.assert_array_almost_equal(result, expected_matrix)


def test_average_antidiagonals():
matrix = np.arange(12).reshape(4, 3)
expected_timeseries = np.array([0., 2., 4., 7., 9., 11.])
result = average_antidiagonals(matrix)
np.testing.assert_array_almost_equal(result, expected_timeseries)


def test_average_antidiagonals_square_matrix():
matrix = np.array([
[1, 2, 3],
Expand All @@ -108,21 +120,206 @@ def test_average_antidiagonals_square_matrix():
result = average_antidiagonals(matrix)
np.testing.assert_array_almost_equal(result, expected_timeseries)


def test_average_antidiagonals_single_row():
matrix = np.array([[1, 2, 3]])
expected_timeseries = np.array([1., 2., 3.])
result = average_antidiagonals(matrix)
np.testing.assert_array_almost_equal(result, expected_timeseries)


def test_average_antidiagonals_single_column():
matrix = np.array([[1], [2], [3]])
expected_timeseries = np.array([1., 2., 3.])
result = average_antidiagonals(matrix)
np.testing.assert_array_almost_equal(result, expected_timeseries)


def test_average_antidiagonals_empty():
matrix = np.array([[]])
expected_timeseries = np.array([])
result = average_antidiagonals(matrix)
np.testing.assert_array_almost_equal(result, expected_timeseries)


def test_autoregressive_model_score_valid_ar0(ar1_timeseries50):
timeseries = ar1_timeseries50
order = 0 # white noise should work as well
score = autoregressive_model_score(timeseries, order)
assert isinstance(score, float)


def test_autoregressive_model_score_valid_bic(ar1_timeseries50):
timeseries = ar1_timeseries50
order = 1
score = autoregressive_model_score(timeseries, order, criterion='bic')
assert isinstance(score, float)


def test_autoregressive_model_score_valid_aic(ar1_timeseries50):
timeseries = ar1_timeseries50
order = 1
score = autoregressive_model_score(timeseries, order, criterion='aic')
assert isinstance(score, float)


def test_autoregressive_model_score_empty_timeseries():
timeseries = []
order = 1
with pytest.raises(ValueError, match="timeseries cannot be empty"):
autoregressive_model_score(timeseries, order)


def test_autoregressive_model_score_negative_order(ar1_timeseries50):
timeseries = ar1_timeseries50
order = -1
with pytest.raises(ValueError, match="order must be non-negative"):
autoregressive_model_score(timeseries, order)


def test_autoregressive_model_score_order_greater_than_length():
timeseries = [1.0, 2.0, 3.0, 4.0, 5.0]
order = 10
with pytest.raises(ValueError,
match="timeseries length must be greater than order"):
autoregressive_model_score(timeseries, order)


def test_fit_autoregressive_model_default(ar1_timeseries50):
timeseries = ar1_timeseries50
model = fit_autoregressive_model(timeseries)
expected_params = np.array([0.599205, 0.827014])
np.testing.assert_allclose(model.params, expected_params, atol=1e-5)


def test_fit_autoregressive_model_aic(ar1_timeseries50):
timeseries = ar1_timeseries50
model = fit_autoregressive_model(timeseries, criterion='aic')
assert hasattr(model, 'aic')
assert hasattr(model, 'bic')


def test_fit_autoregressive_model_specified_max_order(ar1_timeseries50):
timeseries = ar1_timeseries50
model = fit_autoregressive_model(timeseries, max_order=5)
assert hasattr(model, 'aic')
assert hasattr(model, 'bic')


def test_fit_autoregressive_model_empty_timeseries():
timeseries = []
with pytest.raises(ValueError,
match="timeseries length must be greater than max_order"):
fit_autoregressive_model(timeseries)


def test_fit_autoregressive_model_max_order_greater_than_length():
timeseries = [1.0, 2.0, 3.0, 4.0, 5.0]
with pytest.raises(ValueError,
match="timeseries length must be greater than max_order"):
fit_autoregressive_model(timeseries, max_order=10)


def test_fit_autoregressive_model_negative_max_order(ar1_timeseries50):
timeseries = ar1_timeseries50
with pytest.raises(ValueError, match="max_order must be non-negative"):
fit_autoregressive_model(timeseries, max_order=-1)


def test_fit_autoregressive_model_non_integer_max_order(ar1_timeseries50):
timeseries = ar1_timeseries50
with pytest.raises(TypeError, match="max_order must be an integer"):
fit_autoregressive_model(timeseries, max_order=1.5)


def test_fit_autoregressive_model_parallel_jobs(ar1_timeseries50):
timeseries = ar1_timeseries50
model = fit_autoregressive_model(timeseries, n_jobs=2)
assert hasattr(model, 'aic')
assert hasattr(model, 'bic')


def test_generate_ar_surrogate_valid_parameters():
ar_coefficients = [1, -0.5]
n_samples = 100
scale = 1.0
surrogate = generate_autoregressive_surrogate(ar_coefficients, n_samples,
scale)
assert isinstance(surrogate, np.ndarray)
assert len(surrogate) == n_samples
model = fit_autoregressive_model(surrogate)

np.testing.assert_allclose(-model.params[0], ar_coefficients[-1], atol=1e-1)
np.testing.assert_allclose(model.params[-1], scale, atol=1e-1)

def test_generate_ar_surrogate_with_seed():
ar_coefficients = [1, -0.5]
n_samples = 10
scale = 1.0
seed = 42
surrogate_1 = generate_autoregressive_surrogate(ar_coefficients, n_samples,
scale, seed)
surrogate_2 = generate_autoregressive_surrogate(ar_coefficients, n_samples,
scale, seed)
assert np.array_equal(surrogate_1, surrogate_2)


def test_generate_ar_surrogate_zero_samples():
ar_coefficients = [1, -0.5]
n_samples = 0
scale = 1.0
with pytest.raises(ValueError, match="n_samples must be positive"):
generate_autoregressive_surrogate(ar_coefficients, n_samples, scale)


def test_generate_ar_surrogate_negative_samples():
ar_coefficients = [1, -0.5]
n_samples = -10
scale = 1.0
with pytest.raises(ValueError, match="n_samples must be positive"):
generate_autoregressive_surrogate(ar_coefficients, n_samples, scale)


def test_generate_ar_surrogate_zero_scale():
ar_coefficients = [1, -0.5]
n_samples = 100
scale = 0.0
with pytest.raises(ValueError, match="scale must be positive"):
generate_autoregressive_surrogate(ar_coefficients, n_samples, scale)


def test_generate_ar_surrogate_negative_scale():
ar_coefficients = [1, -0.5]
n_samples = 100
scale = -1.0
with pytest.raises(ValueError, match="scale must be positive"):
generate_autoregressive_surrogate(ar_coefficients, n_samples, scale)


def test_generate_ar_surrogate_empty_ar_coefficients():
ar_coefficients = []
n_samples = 100
scale = 1.0
with pytest.raises(ValueError, match="ar_coefficients must not be empty"):
generate_autoregressive_surrogate(ar_coefficients, n_samples, scale)


def test_generate_ar_surrogate_first_coeff_not_one():
ar_coefficients = [0.5, -0.5]
n_samples = 100
scale = 1.0
with pytest.raises(ValueError, match="First AR coefficient must be 1"):
generate_autoregressive_surrogate(ar_coefficients, n_samples, scale)


def test_generate_ar_surrogate_burnin_effect():
ar_coefficients = [1, -0.5]
n_samples = 100
scale = 1.0
seed = 42
surrogate_without_burnin = generate_autoregressive_surrogate(
ar_coefficients, n_samples, scale, seed, burnin=0)
surrogate_with_burnin = generate_autoregressive_surrogate(ar_coefficients,
n_samples, scale,
seed, burnin=100)
assert not np.array_equal(surrogate_without_burnin, surrogate_with_burnin)
Loading

0 comments on commit 366b9cc

Please sign in to comment.