Skip to content

Commit

Permalink
More work and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bashtage committed Apr 2, 2020
1 parent cc62df7 commit a3e0951
Show file tree
Hide file tree
Showing 10 changed files with 1,385 additions and 1,237 deletions.
1 change: 1 addition & 0 deletions arch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pytest_plugins = [
"arch.tests.unitroot.cointegration_data",
"arch.tests.covariance.covariance_data",
]


Expand Down
1 change: 1 addition & 0 deletions arch/covariance/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class CovarianceEstimator(ABC):
def __init__(
self,
x: ArrayLike,
*,
bandwidth: Optional[float] = None,
df_adjust: int = 0,
center: bool = True,
Expand Down
32 changes: 22 additions & 10 deletions arch/covariance/var.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Dict, NamedTuple, Optional, Tuple

import numpy as np
from numpy import zeros
from numpy.linalg import lstsq
import pandas as pd
from statsmodels.tools import add_constant
Expand Down Expand Up @@ -38,6 +37,7 @@ class PreWhitenRecoloredCovariance(CovarianceEstimator):
df_adjust : int, default 0
center : bool, default True
weights : array_like, default None
force_int: bool, default False
See Also
--------
Expand All @@ -52,6 +52,7 @@ class PreWhitenRecoloredCovariance(CovarianceEstimator):
def __init__(
self,
x: ArrayLike,
*,
lags: Optional[int] = None,
method: str = "aic",
diagonal: bool = True,
Expand All @@ -62,9 +63,15 @@ def __init__(
df_adjust: int = 0,
center: bool = True,
weights: Optional[ArrayLike] = None,
force_int: bool = False,
) -> None:
super().__init__(
x, bandwidth=bandwidth, df_adjust=df_adjust, center=center, weights=weights
x,
bandwidth=bandwidth,
df_adjust=df_adjust,
center=center,
weights=weights,
force_int=force_int,
)
self._kernel_name = kernel
self._lags = 0
Expand Down Expand Up @@ -155,7 +162,7 @@ def _ic_from_vars(
ics: Dict[Tuple[int, int], float] = {
(full_order, full_order): self._ic(sigma, nparam, nobs)
}
if not self._diagonal:
if not self._diagonal or self._x.shape[1] == 1:
return ics

purged_indiv_lags = np.empty((nvar, nobs, max_lag - full_order))
Expand Down Expand Up @@ -214,7 +221,7 @@ def _estimate_var(self, full_order: int, diag_order: int) -> VARModel:
rhs = rhs[:, : c + full_order * nvar]
extra_lags = extra_lags[:, :, full_order:diag_order]

params = zeros((nvar, nvar * max_lag + center))
params = np.zeros((nvar, nvar * max_lag + center))
resids = np.empty_like(lhs)
ncommon = rhs.shape[1]
for i in range(nvar):
Expand Down Expand Up @@ -246,8 +253,8 @@ def _estimate_sample_cov(self, nvar: int, nlag: int) -> NDArray:
if self._center:
x = x - x.mean(0)
nobs = x.shape[0]
var_cov = zeros((nvar * nlag, nvar * nlag))
gamma = zeros((nlag, nvar, nvar))
var_cov = np.zeros((nvar * nlag, nvar * nlag))
gamma = np.zeros((nlag, nvar, nvar))
for i in range(nlag):
gamma[i] = (x[i:].T @ x[: (nobs - i)]) / nobs
for r in range(nlag):
Expand All @@ -261,7 +268,7 @@ def _estimate_sample_cov(self, nvar: int, nlag: int) -> NDArray:
def _estimate_model_cov(
self, nvar: int, nlag: int, coeffs: NDArray, short_run: NDArray
) -> NDArray:
sigma = zeros((nvar * nlag, nvar * nlag))
sigma = np.zeros((nvar * nlag, nvar * nlag))
sigma[:nvar, :nvar] = short_run
multiplier = np.linalg.inv(np.eye(coeffs.size) - np.kron(coeffs, coeffs))
vec_sigma = sigma.ravel()[:, None]
Expand All @@ -274,7 +281,7 @@ def _companion_form(
) -> Tuple[NDArray, NDArray]:
nvar = var_model.resids.shape[1]
nlag = var_model.var_order
coeffs = zeros((nvar * nlag, nvar * nlag))
coeffs = np.zeros((nvar * nlag, nvar * nlag))
coeffs[:nvar] = var_model.params[:, var_model.intercept :]
for i in range(nlag - 1):
coeffs[(i + 1) * nvar : (i + 2) * nvar, i * nvar : (i + 1) * nvar] = np.eye(
Expand All @@ -294,7 +301,12 @@ def cov(self) -> CovarianceEstimate:
resids = var_mod.resids
nobs, nvar = resids.shape
self._kernel_instance = self._kernel(
resids, self._bandwidth, 0, False, self._x_weights, self._force_int
resids,
bandwidth=self._bandwidth,
df_adjust=0,
center=False,
weights=self._x_weights,
force_int=self._force_int,
)
kern_cov = self._kernel_instance.cov
short_run = kern_cov.short_run
Expand All @@ -316,7 +328,7 @@ def cov(self) -> CovarianceEstimate:
have diagonal coefficient matrices. The maximum eigenvalue of the companion-form \
VAR(1) coefficient matrix is {max_eig}."""
)
coeff_sum = zeros((nvar, nvar))
coeff_sum = np.zeros((nvar, nvar))
params = var_mod.params[:, var_mod.intercept :]
for i in range(var_mod.var_order):
coeff_sum += params[:, i * nvar : (i + 1) * nvar]
Expand Down
45 changes: 45 additions & 0 deletions arch/tests/covariance/covariance_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from itertools import product

import numpy as np
import pandas as pd
import pytest

DATA_PARAMS = list(product([1, 3], [True, False], [0, 1, 3]))
DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS]


@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS)
def covariance_data(request):
dim, pandas, order = request.param
rs = np.random.RandomState([839084, 3823810, 982103, 829108])
burn = 100
shape = (burn + 500,)
if dim > 1:
shape += (3,)
rvs = rs.standard_normal(shape)
phi = np.zeros((order, dim, dim))
if order > 0:
phi[0] = np.eye(dim) * 0.4 + 0.1
for i in range(1, order):
phi[i] = 0.3 / (i + 1) * np.eye(dim)
for i in range(order, burn + 500):
for j in range(order):
if dim == 1:
rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1])
else:
rvs[i] += phi[j] @ rvs[i - j - 1]
if order > 1:
p = np.eye(dim * order, dim * order, -dim)
for j in range(order):
p[:dim, j * dim : (j + 1) * dim] = phi[j]
v, _ = np.linalg.eig(p)
assert np.max(np.abs(v)) < 1
rvs = rvs[burn:]
if pandas and dim == 1:
return pd.Series(rvs, name="x")
elif pandas:
df = pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)])
df.to_csv(f"cov-data-order-{order}.csv")
return df

return rvs
102 changes: 51 additions & 51 deletions arch/tests/covariance/test_var.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from itertools import product
from typing import Optional, Tuple

import numpy as np
from numpy.testing import assert_allclose
import pandas as pd
import pytest

from arch.covariance.kernel import CovarianceEstimate
from arch.covariance.var import PreWhitenRecoloredCovariance
from arch.typing import NDArray

DATA_PARAMS = list(product([1, 3], [True, False], [0])) # , 1, 3]))
DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS]
KERNELS = [
"Bartlett",
"Parzen",
Expand All @@ -32,40 +30,6 @@ def kernel(request):
return request.param


@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS)
def data(request):
dim, pandas, order = request.param
rs = np.random.RandomState([839084, 3823810, 982103, 829108])
burn = 100
shape = (burn + 500,)
if dim > 1:
shape += (3,)
rvs = rs.standard_normal(shape)
phi = np.zeros((order, dim, dim))
if order > 0:
phi[0] = np.eye(dim) * 0.4 + 0.1
for i in range(1, order):
phi[i] = 0.3 / (i + 1) * np.eye(dim)
for i in range(order, burn + 500):
for j in range(order):
if dim == 1:
rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1])
else:
rvs[i] += phi[j] @ rvs[i - j - 1]
if order > 1:
p = np.eye(dim * order, dim * order, -dim)
for j in range(order):
p[:dim, j * dim : (j + 1) * dim] = phi[j]
v, _ = np.linalg.eig(p)
assert np.max(np.abs(v)) < 1
rvs = rvs[burn:]
if pandas and dim == 1:
return pd.Series(rvs, name="x")
elif pandas:
return pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)])
return rvs


def direct_var(
x, const: bool, full_order: int, diag_order: int, max_order: Optional[int] = None
) -> Tuple[NDArray, NDArray]:
Expand Down Expand Up @@ -140,29 +104,38 @@ def direct_ic(
@pytest.mark.parametrize("diag_order", [3, 5])
@pytest.mark.parametrize("max_order", [None, 10])
@pytest.mark.parametrize("ic", ["aic", "bic", "hqc"])
def test_direct_var(data, const, full_order, diag_order, max_order, ic):
direct_ic(data, ic, const, full_order, diag_order, max_order)
def test_direct_var(covariance_data, const, full_order, diag_order, max_order, ic):
direct_ic(covariance_data, ic, const, full_order, diag_order, max_order)


@pytest.mark.parametrize("center", [True, False])
@pytest.mark.parametrize("diagonal", [True, False])
@pytest.mark.parametrize("method", ["aic", "bic", "hqc"])
def test_ic(data, center, diagonal, method):
def test_ic(covariance_data, center, diagonal, method):
pwrc = PreWhitenRecoloredCovariance(
data, center=center, diagonal=diagonal, method=method, bandwidth=0.0,
covariance_data, center=center, diagonal=diagonal, method=method, bandwidth=0.0,
)
cov = pwrc.cov
expected_type = np.ndarray if isinstance(data, np.ndarray) else pd.DataFrame
expected_type = (
np.ndarray if isinstance(covariance_data, np.ndarray) else pd.DataFrame
)
assert isinstance(cov.short_run, expected_type)
expected_max_lag = int(data.shape[0] ** (1 / 3))
expected_max_lag = int(covariance_data.shape[0] ** (1 / 3))
assert pwrc._max_lag == expected_max_lag
expected_ics = {}
for full_order in range(expected_max_lag + 1):
diag_limit = expected_max_lag + 1 if diagonal else full_order + 1
if covariance_data.ndim == 1 or covariance_data.shape[1] == 1:
diag_limit = full_order + 1
for diag_order in range(full_order, diag_limit):
key = (full_order, diag_order)
expected_ics[key] = direct_ic(
data, method, center, full_order, diag_order, max_order=expected_max_lag
covariance_data,
method,
center,
full_order,
diag_order,
max_order=expected_max_lag,
)
assert tuple(sorted(pwrc._ics.keys())) == tuple(sorted(expected_ics.keys()))
for key in expected_ics:
Expand All @@ -175,13 +148,18 @@ def test_ic(data, center, diagonal, method):
@pytest.mark.parametrize("diagonal", [True, False])
@pytest.mark.parametrize("method", ["aic", "bic", "hqc"])
@pytest.mark.parametrize("lags", [0, 1, 3])
def test_short_long_run(data, center, diagonal, method, lags):
def test_short_long_run(covariance_data, center, diagonal, method, lags):
pwrc = PreWhitenRecoloredCovariance(
data, center=center, diagonal=diagonal, method=method, lags=lags, bandwidth=0.0,
covariance_data,
center=center,
diagonal=diagonal,
method=method,
lags=lags,
bandwidth=0.0,
)
cov = pwrc.cov
full_order, diag_order = pwrc._order
params, resids = direct_var(data, center, full_order, diag_order)
params, resids = direct_var(covariance_data, center, full_order, diag_order)
nobs, nvar = resids.shape
expected_short_run = resids.T @ resids / nobs
assert_allclose(cov.short_run, expected_short_run)
Expand All @@ -195,12 +173,29 @@ def test_short_long_run(data, center, diagonal, method, lags):
assert_allclose(cov.long_run, expected_long_run)


@pytest.mark.parametrize("force_int", [True, False])
def test_pwrc_attributes(covariance_data, force_int):
pwrc = PreWhitenRecoloredCovariance(covariance_data, force_int=force_int)
assert isinstance(pwrc.bandwidth_scale, float)
assert isinstance(pwrc.kernel_const, float)
assert isinstance(pwrc.rate, float)
assert isinstance(pwrc._weights(), np.ndarray)
assert pwrc.force_int == force_int
expected_type = (
np.ndarray if isinstance(covariance_data, np.ndarray) else pd.DataFrame
)
assert isinstance(pwrc.cov.short_run, expected_type)
assert isinstance(pwrc.cov.long_run, expected_type)
assert isinstance(pwrc.cov.one_sided, expected_type)
assert isinstance(pwrc.cov.one_sided_strict, expected_type)


@pytest.mark.parametrize("sample_autocov", [True, False])
def test_data(data, sample_autocov):
def test_data(covariance_data, sample_autocov, kernel):
pwrc = PreWhitenRecoloredCovariance(
data, sample_autocov=sample_autocov, bandwidth=0.0
covariance_data, sample_autocov=sample_autocov, kernel=kernel, bandwidth=0.0
)
pwrc.cov
assert isinstance(pwrc.cov, CovarianceEstimate)


def test_pwrc_errors():
Expand All @@ -216,4 +211,9 @@ def test_pwrc_errors():
def test_pwrc_warnings():
x = np.random.standard_normal((9, 5))
with pytest.warns(RuntimeWarning, match="The maximum number of lags is 0"):
PreWhitenRecoloredCovariance(x).cov
assert isinstance(PreWhitenRecoloredCovariance(x).cov, CovarianceEstimate)


def test_unknown_kernel(covariance_data):
with pytest.raises(ValueError, match=""):
PreWhitenRecoloredCovariance(covariance_data, kernel="unknown")
24 changes: 20 additions & 4 deletions arch/tests/unitroot/test_dynamic_ols.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ def test_smoke(data, trend, lags, leads, common, max_lag, method):
y, x = data
if common:
leads = lags
mod = DynamicOLS(y, x, trend, lags, leads, common, max_lag, max_lag, method)
mod = DynamicOLS(
y,
x,
trend,
lags=lags,
leads=leads,
common=common,
max_lag=max_lag,
max_lead=max_lag,
method=method,
)
mod.fit()


Expand All @@ -27,8 +37,14 @@ def test_smoke(data, trend, lags, leads, common, max_lag, method):
@pytest.mark.parametrize("df_adjust", [True, False])
def test_smoke_fit(data, cov_type, kernel, bandwidth, force_int, df_adjust):
y, x = data
mod = DynamicOLS(y, x, "ct", 3, 5, False)
res = mod.fit(cov_type, kernel, bandwidth, force_int, df_adjust)
mod = DynamicOLS(y, x, "ct", lags=3, leads=5, common=False)
res = mod.fit(
cov_type,
kernel=kernel,
bandwidth=bandwidth,
force_int=force_int,
df_adjust=df_adjust,
)
assert isinstance(res.leads, int)
assert isinstance(res.lags, int)
assert isinstance(res.bandwidth, (int, float))
Expand All @@ -44,7 +60,7 @@ def test_smoke_fit(data, cov_type, kernel, bandwidth, force_int, df_adjust):
def test_mismatch_lead_lag(data):
y, x = data
with pytest.raises(ValueError, match="common is specified but leads"):
DynamicOLS(y, x, "c", 4, 5, True)
DynamicOLS(y, x, "c", lags=4, leads=5, common=True)
with pytest.raises(ValueError, match="common is specified but max_lead"):
DynamicOLS(y, x, max_lag=6, max_lead=7, common=True)

Expand Down
Loading

0 comments on commit a3e0951

Please sign in to comment.