Skip to content

Commit

Permalink
More work and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bashtage committed Apr 2, 2020
1 parent cc62df7 commit 3cb3d24
Show file tree
Hide file tree
Showing 10 changed files with 1,379 additions and 1,230 deletions.
1 change: 1 addition & 0 deletions arch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pytest_plugins = [
"arch.tests.unitroot.cointegration_data",
"arch.tests.covariance.covariance_data",
]


Expand Down
1 change: 1 addition & 0 deletions arch/covariance/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class CovarianceEstimator(ABC):
def __init__(
self,
x: ArrayLike,
*,
bandwidth: Optional[float] = None,
df_adjust: int = 0,
center: bool = True,
Expand Down
19 changes: 16 additions & 3 deletions arch/covariance/var.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class PreWhitenRecoloredCovariance(CovarianceEstimator):
df_adjust : int, default 0
center : bool, default True
weights : array_like, default None
force_int: bool, default False
See Also
--------
Expand All @@ -52,6 +53,7 @@ class PreWhitenRecoloredCovariance(CovarianceEstimator):
def __init__(
self,
x: ArrayLike,
*,
lags: Optional[int] = None,
method: str = "aic",
diagonal: bool = True,
Expand All @@ -62,9 +64,15 @@ def __init__(
df_adjust: int = 0,
center: bool = True,
weights: Optional[ArrayLike] = None,
force_int: bool = False,
) -> None:
super().__init__(
x, bandwidth=bandwidth, df_adjust=df_adjust, center=center, weights=weights
x,
bandwidth=bandwidth,
df_adjust=df_adjust,
center=center,
weights=weights,
force_int=force_int,
)
self._kernel_name = kernel
self._lags = 0
Expand Down Expand Up @@ -155,7 +163,7 @@ def _ic_from_vars(
ics: Dict[Tuple[int, int], float] = {
(full_order, full_order): self._ic(sigma, nparam, nobs)
}
if not self._diagonal:
if not self._diagonal or self._x.shape[1] == 1:
return ics

purged_indiv_lags = np.empty((nvar, nobs, max_lag - full_order))
Expand Down Expand Up @@ -294,7 +302,12 @@ def cov(self) -> CovarianceEstimate:
resids = var_mod.resids
nobs, nvar = resids.shape
self._kernel_instance = self._kernel(
resids, self._bandwidth, 0, False, self._x_weights, self._force_int
resids,
bandwidth=self._bandwidth,
df_adjust=0,
center=False,
weights=self._x_weights,
force_int=self._force_int,
)
kern_cov = self._kernel_instance.cov
short_run = kern_cov.short_run
Expand Down
45 changes: 45 additions & 0 deletions arch/tests/covariance/covariance_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from itertools import product

import numpy as np
import pandas as pd
import pytest

DATA_PARAMS = list(product([1, 3], [True, False], [0, 1, 3]))
DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS]


@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS)
def covariance_data(request):
dim, pandas, order = request.param
rs = np.random.RandomState([839084, 3823810, 982103, 829108])
burn = 100
shape = (burn + 500,)
if dim > 1:
shape += (3,)
rvs = rs.standard_normal(shape)
phi = np.zeros((order, dim, dim))
if order > 0:
phi[0] = np.eye(dim) * 0.4 + 0.1
for i in range(1, order):
phi[i] = 0.3 / (i + 1) * np.eye(dim)
for i in range(order, burn + 500):
for j in range(order):
if dim == 1:
rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1])
else:
rvs[i] += phi[j] @ rvs[i - j - 1]
if order > 1:
p = np.eye(dim * order, dim * order, -dim)
for j in range(order):
p[:dim, j * dim : (j + 1) * dim] = phi[j]
v, _ = np.linalg.eig(p)
assert np.max(np.abs(v)) < 1
rvs = rvs[burn:]
if pandas and dim == 1:
return pd.Series(rvs, name="x")
elif pandas:
df = pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)])
df.to_csv(f"cov-data-order-{order}.csv")
return df

return rvs
102 changes: 51 additions & 51 deletions arch/tests/covariance/test_var.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from itertools import product
from typing import Optional, Tuple

import numpy as np
from numpy.testing import assert_allclose
import pandas as pd
import pytest

from arch.covariance.kernel import CovarianceEstimate
from arch.covariance.var import PreWhitenRecoloredCovariance
from arch.typing import NDArray

DATA_PARAMS = list(product([1, 3], [True, False], [0])) # , 1, 3]))
DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS]
KERNELS = [
"Bartlett",
"Parzen",
Expand All @@ -32,40 +30,6 @@ def kernel(request):
return request.param


@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS)
def data(request):
dim, pandas, order = request.param
rs = np.random.RandomState([839084, 3823810, 982103, 829108])
burn = 100
shape = (burn + 500,)
if dim > 1:
shape += (3,)
rvs = rs.standard_normal(shape)
phi = np.zeros((order, dim, dim))
if order > 0:
phi[0] = np.eye(dim) * 0.4 + 0.1
for i in range(1, order):
phi[i] = 0.3 / (i + 1) * np.eye(dim)
for i in range(order, burn + 500):
for j in range(order):
if dim == 1:
rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1])
else:
rvs[i] += phi[j] @ rvs[i - j - 1]
if order > 1:
p = np.eye(dim * order, dim * order, -dim)
for j in range(order):
p[:dim, j * dim : (j + 1) * dim] = phi[j]
v, _ = np.linalg.eig(p)
assert np.max(np.abs(v)) < 1
rvs = rvs[burn:]
if pandas and dim == 1:
return pd.Series(rvs, name="x")
elif pandas:
return pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)])
return rvs


def direct_var(
x, const: bool, full_order: int, diag_order: int, max_order: Optional[int] = None
) -> Tuple[NDArray, NDArray]:
Expand Down Expand Up @@ -140,29 +104,38 @@ def direct_ic(
@pytest.mark.parametrize("diag_order", [3, 5])
@pytest.mark.parametrize("max_order", [None, 10])
@pytest.mark.parametrize("ic", ["aic", "bic", "hqc"])
def test_direct_var(data, const, full_order, diag_order, max_order, ic):
direct_ic(data, ic, const, full_order, diag_order, max_order)
def test_direct_var(covariance_data, const, full_order, diag_order, max_order, ic):
direct_ic(covariance_data, ic, const, full_order, diag_order, max_order)


@pytest.mark.parametrize("center", [True, False])
@pytest.mark.parametrize("diagonal", [True, False])
@pytest.mark.parametrize("method", ["aic", "bic", "hqc"])
def test_ic(data, center, diagonal, method):
def test_ic(covariance_data, center, diagonal, method):
pwrc = PreWhitenRecoloredCovariance(
data, center=center, diagonal=diagonal, method=method, bandwidth=0.0,
covariance_data, center=center, diagonal=diagonal, method=method, bandwidth=0.0,
)
cov = pwrc.cov
expected_type = np.ndarray if isinstance(data, np.ndarray) else pd.DataFrame
expected_type = (
np.ndarray if isinstance(covariance_data, np.ndarray) else pd.DataFrame
)
assert isinstance(cov.short_run, expected_type)
expected_max_lag = int(data.shape[0] ** (1 / 3))
expected_max_lag = int(covariance_data.shape[0] ** (1 / 3))
assert pwrc._max_lag == expected_max_lag
expected_ics = {}
for full_order in range(expected_max_lag + 1):
diag_limit = expected_max_lag + 1 if diagonal else full_order + 1
if covariance_data.ndim == 1 or covariance_data.shape[1] == 1:
diag_limit = full_order + 1
for diag_order in range(full_order, diag_limit):
key = (full_order, diag_order)
expected_ics[key] = direct_ic(
data, method, center, full_order, diag_order, max_order=expected_max_lag
covariance_data,
method,
center,
full_order,
diag_order,
max_order=expected_max_lag,
)
assert tuple(sorted(pwrc._ics.keys())) == tuple(sorted(expected_ics.keys()))
for key in expected_ics:
Expand All @@ -175,13 +148,18 @@ def test_ic(data, center, diagonal, method):
@pytest.mark.parametrize("diagonal", [True, False])
@pytest.mark.parametrize("method", ["aic", "bic", "hqc"])
@pytest.mark.parametrize("lags", [0, 1, 3])
def test_short_long_run(data, center, diagonal, method, lags):
def test_short_long_run(covariance_data, center, diagonal, method, lags):
pwrc = PreWhitenRecoloredCovariance(
data, center=center, diagonal=diagonal, method=method, lags=lags, bandwidth=0.0,
covariance_data,
center=center,
diagonal=diagonal,
method=method,
lags=lags,
bandwidth=0.0,
)
cov = pwrc.cov
full_order, diag_order = pwrc._order
params, resids = direct_var(data, center, full_order, diag_order)
params, resids = direct_var(covariance_data, center, full_order, diag_order)
nobs, nvar = resids.shape
expected_short_run = resids.T @ resids / nobs
assert_allclose(cov.short_run, expected_short_run)
Expand All @@ -195,12 +173,29 @@ def test_short_long_run(data, center, diagonal, method, lags):
assert_allclose(cov.long_run, expected_long_run)


@pytest.mark.parametrize("force_int", [True, False])
def test_pwrc_attributes(covariance_data, force_int):
pwrc = PreWhitenRecoloredCovariance(covariance_data, force_int=force_int)
assert isinstance(pwrc.bandwidth_scale, float)
assert isinstance(pwrc.kernel_const, float)
assert isinstance(pwrc.rate, float)
assert isinstance(pwrc._weights(), np.ndarray)
assert pwrc.force_int == force_int
expected_type = (
np.ndarray if isinstance(covariance_data, np.ndarray) else pd.DataFrame
)
assert isinstance(pwrc.cov.short_run, expected_type)
assert isinstance(pwrc.cov.long_run, expected_type)
assert isinstance(pwrc.cov.one_sided, expected_type)
assert isinstance(pwrc.cov.one_sided_strict, expected_type)


@pytest.mark.parametrize("sample_autocov", [True, False])
def test_data(data, sample_autocov):
def test_data(covariance_data, sample_autocov, kernel):
pwrc = PreWhitenRecoloredCovariance(
data, sample_autocov=sample_autocov, bandwidth=0.0
covariance_data, sample_autocov=sample_autocov, kernel=kernel, bandwidth=0.0
)
pwrc.cov
assert isinstance(pwrc.cov, CovarianceEstimate)


def test_pwrc_errors():
Expand All @@ -216,4 +211,9 @@ def test_pwrc_errors():
def test_pwrc_warnings():
x = np.random.standard_normal((9, 5))
with pytest.warns(RuntimeWarning, match="The maximum number of lags is 0"):
PreWhitenRecoloredCovariance(x).cov
assert isinstance(PreWhitenRecoloredCovariance(x).cov, CovarianceEstimate)


def test_unknown_kernel(covariance_data):
with pytest.raises(ValueError, match=""):
PreWhitenRecoloredCovariance(covariance_data, kernel="unknown")
24 changes: 20 additions & 4 deletions arch/tests/unitroot/test_dynamic_ols.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ def test_smoke(data, trend, lags, leads, common, max_lag, method):
y, x = data
if common:
leads = lags
mod = DynamicOLS(y, x, trend, lags, leads, common, max_lag, max_lag, method)
mod = DynamicOLS(
y,
x,
trend,
lags=lags,
leads=leads,
common=common,
max_lag=max_lag,
max_lead=max_lag,
method=method,
)
mod.fit()


Expand All @@ -27,8 +37,14 @@ def test_smoke(data, trend, lags, leads, common, max_lag, method):
@pytest.mark.parametrize("df_adjust", [True, False])
def test_smoke_fit(data, cov_type, kernel, bandwidth, force_int, df_adjust):
y, x = data
mod = DynamicOLS(y, x, "ct", 3, 5, False)
res = mod.fit(cov_type, kernel, bandwidth, force_int, df_adjust)
mod = DynamicOLS(y, x, "ct", lags=3, leads=5, common=False)
res = mod.fit(
cov_type,
kernel=kernel,
bandwidth=bandwidth,
force_int=force_int,
df_adjust=df_adjust,
)
assert isinstance(res.leads, int)
assert isinstance(res.lags, int)
assert isinstance(res.bandwidth, (int, float))
Expand All @@ -44,7 +60,7 @@ def test_smoke_fit(data, cov_type, kernel, bandwidth, force_int, df_adjust):
def test_mismatch_lead_lag(data):
y, x = data
with pytest.raises(ValueError, match="common is specified but leads"):
DynamicOLS(y, x, "c", 4, 5, True)
DynamicOLS(y, x, "c", lags=4, leads=5, common=True)
with pytest.raises(ValueError, match="common is specified but max_lead"):
DynamicOLS(y, x, max_lag=6, max_lead=7, common=True)

Expand Down
12 changes: 6 additions & 6 deletions arch/tests/unitroot/test_fmols_ccr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def test_fmols_smoke(
y, x = trivariate_data
if x_trend is not None and len(x_trend) < len(trend):
x_trend = trend
mod = FullyModifiedOLS(y, x, trend, x_trend)
res = mod.fit(kernel, bandwidth, force_int, diff)
mod = FullyModifiedOLS(y, x, trend, x_trend=x_trend)
res = mod.fit(kernel=kernel, bandwidth=bandwidth, force_int=force_int, diff=diff)
assert isinstance(res.summary(), Summary)


Expand All @@ -35,8 +35,8 @@ def test_ccr_smoke(trivariate_data, trend, x_trend, diff, kernel, bandwidth, for
y, x = trivariate_data
if x_trend is not None and len(x_trend) < len(trend):
x_trend = trend
mod = CanonicalCointegratingReg(y, x, trend, x_trend)
res = mod.fit(kernel, bandwidth, force_int, diff)
mod = CanonicalCointegratingReg(y, x, trend, x_trend=x_trend)
res = mod.fit(kernel=kernel, bandwidth=bandwidth, force_int=force_int, diff=diff)
assert isinstance(res.summary(), Summary)


Expand Down Expand Up @@ -189,7 +189,7 @@ def test_fmols_eviews(trivariate_data, test_key):
trend, x_trend, diff = test_key
key = (trend, x_trend, diff)
test_res = setup_test_values(FMOLS_RES[key])
mod = FullyModifiedOLS(y, x, trend, x_trend)
mod = FullyModifiedOLS(y, x, trend, x_trend=x_trend)
# BW is one less than what Eviews reports
res = mod.fit(bandwidth=6, force_int=True, diff=diff, df_adjust=True)
if trend != "ctt":
Expand Down Expand Up @@ -322,7 +322,7 @@ def test_ccr_eviews(trivariate_data, test_key):
trend, x_trend, diff = test_key
key = (trend, x_trend, diff)
test_res = setup_test_values(CCR_RES[key])
mod = CanonicalCointegratingReg(y, x, trend, x_trend)
mod = CanonicalCointegratingReg(y, x, trend, x_trend=x_trend)
# BW is one less than what Eviews reports
res = mod.fit(bandwidth=6, force_int=True, diff=diff, df_adjust=True)
if trend == "n":
Expand Down
Loading

0 comments on commit 3cb3d24

Please sign in to comment.