Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jaclark5 committed Jul 25, 2024
1 parent b55552f commit b5fad16
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 61 deletions.
106 changes: 63 additions & 43 deletions src/alchemlyb/convergence/convergence.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,7 @@ def forward_backward_convergence(
for i in range(1, num + 1):
logger.info("Backward analysis: {:.2f}%".format(100 * i / num))
sample = []
for ii, data in enumerate(df_list):
if (
estimator in ["MBAR", "BAR"]
and len(np.unique(np.array([x[1] for x in data.index.to_numpy()]))) > 1
):
raise ValueError(
"Restrict to a single fep-lambda value for a meaningful result in df_list[{}]".format(
ii
)
)
for data in df_list:
sample.append(data[-len(data) // num * i :])
mean, error = _forward_backward_convergence_estimate(
sample, estimator, my_estimator, error_tol, **kwargs
Expand Down Expand Up @@ -453,44 +444,73 @@ def moving_average(df_list, estimator="MBAR", num=10, **kwargs):
estimator_fit = estimators_dispatch[estimator](**kwargs).fit
logger.info(f"Use {estimator} estimator for convergence analysis.")

logger.info("Begin Moving Average Analysis")
logger.info("Check indices")
if estimator in ["MBAR"]:
index_1 = [
np.unique(np.array([x[1] for x in data.index.to_numpy()]))
for data in df_list
]
if len(np.unique(index_1)) == 1 and len(df_list[0].index[0]) > 2:
index_2 = [
np.unique(np.array([x[2] for x in data.index.to_numpy()]))
for data in df_list
]
if len(np.unique(index_2)) > 1:
raise ValueError(
"Restrict to a single fep-lambda value for a meaningful result. index[2] for each file"
" in df_list: {}".format(index_2)
)
elif len(np.unique(index_1)) != 1:
raise ValueError(
"Restrict to a single fep-lambda value for a meaningful result. index[1] for each file"
" in df_list: {}".format(index_1)
)
elif estimator in ["BAR"]:
index_1 = [
np.unique(np.array([x[1] for x in data.index.to_numpy()]))
for data in df_list
]
if len(np.unique(index_1)) == 1 and len(df_list[0].index[0]) > 2:
index_2 = [
np.unique(np.array([x[2] for x in data.index.to_numpy()]))
for data in df_list
]
if len(np.unique(index_2)) != 2:
raise ValueError(
"Restrict to a fep-lambda value and its forward adjacent state for a meaningful "
"result. index[2] for each file in df_list: {}".format(index_2)
)
elif len(np.unique(index_1)) != 2:
raise ValueError(
"Restrict to a fep-lambda value and its forward adjacent state for a meaningful "
"result. index[1] for each file in df_list: {}".format(index_1)
)

logger.info("Begin Moving Average Analysis")
average_list = []
average_error_list = []

# Concatenate dataframes
sample = []
data = df_list[0]
for tmp_data in df_list[1:]:
data = concat([data, tmp_data])

for i in range(1, num):
logger.info("Moving Average Analysis: {:.2f}%".format(100 * i / num))
sample = []
for ii, data in enumerate(df_list):
fep_values = np.unique(np.array([x[1] for x in data.index.to_numpy()]))
if estimator == "MBAR":
if len(fep_values) > 1:
raise ValueError(
"Restrict to a single fep-lambda value for a meaningful result in df_list[{}]".format(
ii
)
)
else:
sample.append(
data[len(data) // num * (i - 1) : len(data) // num * i]
)
elif estimator == "BAR":
if len(fep_values) > 2:
raise ValueError(
"Restrict to a fep-lambda value and its forward adjacent state for a meaningful result in df_list[{}]".format(
ii
)
)
else:
data1 = data.iloc[
data.index.get_level_values("fep-lambda").isin([fep_values[0]])
]
data2 = data.iloc[
data.index.get_level_values("fep-lambda").isin([fep_values[1]])
]
lx = min(len(data1), len(data2))
ind1, ind2 = lx // num * (i - 1), lx // num * i
sample.append(concat([data1[ind1:ind2], data2[ind1:ind2]]))
sample = concat(sample)
if estimator == "MBAR":
sample = data[len(data) // num * (i - 1) : len(data) // num * i]
elif estimator == "BAR":
ind, indices = 1, np.unique(np.array([x[1] for x in data.index.to_numpy()]))
if len(indices) != 2 and len(df_list[0].index[0]) > 2:
ind, indices = 2, np.unique(
np.array([x[2] for x in data.index.to_numpy()])
)
data1 = data.iloc[data.index.get_level_values(ind).isin([indices[0]])]
data2 = data.iloc[data.index.get_level_values(ind).isin([indices[1]])]
lx = min(len(data1), len(data2))
ind1, ind2 = lx // num * (i - 1), lx // num * i
sample = concat([data1[ind1:ind2], data2[ind1:ind2]])
result = estimator_fit(sample)

average_list.append(result.delta_f_.iloc[0, -1])
Expand Down
19 changes: 9 additions & 10 deletions src/alchemlyb/estimators/bar_.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,30 @@ def fit(self, u_nk):
# sort by state so that rows from same state are in contiguous blocks
u_nk = u_nk.sort_index(level=u_nk.index.names[1:])

# get a list of the lambda states that are sampled
self._states_ = u_nk.columns.values.tolist()

# group u_nk by lambda states
groups = u_nk.groupby(level=u_nk.index.names[1:])
N_k = [
(len(groups.get_group(i)) if i in groups.groups else 0)
for i in u_nk.columns
]

# get a list of the lambda states that are sampled
self._states_ = [x for i, x in enumerate(u_nk.columns.values.tolist()) if N_k[i] > 0]
N_k = [x for x in N_k if x > 0]

states = [x for i, x in enumerate(self._states_) if N_k[i] > 0]
# Now get free energy differences and their uncertainties for each step
deltas = np.array([])
d_deltas = np.array([])
for k in range(len(N_k) - 1):
if N_k[k] == 0 or N_k[k + 1] == 0:
continue
# get us from lambda step k
uk = groups.get_group(self._states_[k])
# get w_F
w_f = uk.iloc[:, k + 1] - uk.iloc[:, k]

# get us from lambda step k+1
uk1 = groups.get_group(self._states_[k + 1])

# get w_R
w_r = uk1.iloc[:, k] - uk1.iloc[:, k + 1]

Expand Down Expand Up @@ -152,13 +153,11 @@ def fit(self, u_nk):
ad_delta += np.diagflat(np.array(dout), k=j + 1)

# yield standard delta_f_ free energies between each state
self._delta_f_ = pd.DataFrame(
adelta - adelta.T, columns=self._states_, index=self._states_
)
self._delta_f_ = pd.DataFrame(adelta - adelta.T, columns=states, index=states)

# yield standard deviation d_delta_f_ between each state
self._d_delta_f_ = pd.DataFrame(
np.sqrt(ad_delta + ad_delta.T), columns=self._states_, index=self._states_
np.sqrt(ad_delta + ad_delta.T), columns=states, index=states
)
self._delta_f_.attrs = u_nk.attrs
self._d_delta_f_.attrs = u_nk.attrs
Expand Down
21 changes: 20 additions & 1 deletion src/alchemlyb/estimators/mbar_.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MBAR(BaseEstimator, _EstimatorMixOut):
.. versionchanged:: 2.3.0
The new default is now "BAR" as it provides a substantial speedup
over the previous default `None`.
method : str, optional, default="robust"
The optimization routine to use. This can be any of the methods
Expand Down Expand Up @@ -135,6 +135,25 @@ def fit(self, u_nk):
)
bar.fit(u_nk)
initial_f_k = bar.delta_f_.iloc[0, :]
states = [
x
for i, x in enumerate(self._states_[:-1])
if N_k[i] > 0 and N_k[i + 1] > 0
]
if len(bar.delta_f_.iloc[0, :]) != len(self._states_):
states = [
x
for i, x in enumerate(self._states_[:-1])
if N_k[i] > 0 and N_k[i + 1] > 0
]
initial_f_k = pd.Series(
[
initial_f_k.loc(x) if x in states else np.nan
for x in self._states_
],
index=self._states_,
dtype=float,
)
else:
initial_f_k = self.initial_f_k

Expand Down
62 changes: 56 additions & 6 deletions src/alchemlyb/tests/test_convergence.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,64 @@ def test_convergence_fep(gmx_benzene_Coulomb_u_nk, estimator):
assert convergence.loc[9, "Backward"] == pytest.approx(3.04, 0.01)


@pytest.mark.parametrize("estimator", ["DUMMY"])
def test_moving_average_error_1(gmx_ABFE_complex_u_nk, estimator):
with pytest.raises(ValueError, match=r"Estimator DUMMY is not available .*"):
_ = moving_average(gmx_ABFE_complex_u_nk, estimator)


@pytest.mark.parametrize("estimator", ["MBAR"])
def test_moving_average_error_2_mbar(gmx_ABFE_complex_u_nk, estimator):
df_list = gmx_ABFE_complex_u_nk[10:15]
with pytest.raises(
ValueError,
match=r"Restrict to a single fep-lambda value for a meaningful result. .*",
):
_ = moving_average(df_list, estimator)

df_list = gmx_ABFE_complex_u_nk[14:17]
with pytest.raises(
ValueError,
match=r"Restrict to a single fep-lambda value for a meaningful result. .*",
):
_ = moving_average(df_list, estimator)


@pytest.mark.parametrize("estimator", ["BAR"])
def test_moving_average_error_2_bar(gmx_ABFE_complex_u_nk, estimator):
df_list = gmx_ABFE_complex_u_nk[10:13]
with pytest.raises(
ValueError,
match=r"Restrict to a fep-lambda value and its forward adjacent state .*",
):
_ = moving_average(df_list, estimator)

df_list = gmx_ABFE_complex_u_nk[14:17]
with pytest.raises(
ValueError,
match=r"Restrict to a fep-lambda value and its forward adjacent state .*",
):
_ = moving_average(df_list, estimator)


@pytest.mark.parametrize("estimator", ["BAR"])
def test_moving_average_bar(gmx_ABFE_complex_u_nk, estimator):
df_avg = moving_average(gmx_ABFE_complex_u_nk[14:16], estimator)
assert df_avg.shape == (9, 2)
assert df_avg.loc[0, "FE"] == pytest.approx(0.658, 0.01)
assert df_avg.loc[0, "FE_Error"] == pytest.approx(0.054, 0.1)
assert df_avg.loc[8, "FE"] == pytest.approx(0.926, 0.01)
assert df_avg.loc[8, "FE_Error"] == pytest.approx(0.05, 0.1)


@pytest.mark.parametrize("estimator", ["MBAR"])
def test_moving_average_fep(gmx_benzene_Coulomb_u_nk, estimator):
df_avg = moving_average(gmx_benzene_Coulomb_u_nk, estimator)
def test_moving_average_mbar(gmx_benzene_Coulomb_u_nk, estimator):
df_avg = moving_average([gmx_benzene_Coulomb_u_nk[0]], estimator)
assert df_avg.shape == (9, 2)
assert df_avg.loc[0, "FE"] == pytest.approx(3.01, 0.01)
assert df_avg.loc[0, "FE_Error"] == pytest.approx(0.067, 0.01)
assert df_avg.loc[8, "FE"] == pytest.approx(3.10, 0.01)
assert df_avg.loc[8, "FE_Error"] == pytest.approx(0.066, 0.01)
assert df_avg.loc[0, "FE"] == pytest.approx(3.41, 0.01)
assert df_avg.loc[0, "FE_Error"] == pytest.approx(0.22, 0.01)
assert df_avg.loc[8, "FE"] == pytest.approx(2.83, 0.01)
assert df_avg.loc[8, "FE_Error"] == pytest.approx(0.33, 0.01)


def test_convergence_wrong_estimator(gmx_benzene_Coulomb_dHdl):
Expand Down
11 changes: 10 additions & 1 deletion src/alchemlyb/tests/test_visualisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def test_plot_dF_state(


def test_plot_convergence_dataframe(gmx_benzene_Coulomb_u_nk):
df = forward_backward_convergence(gmx_benzene_Coulomb_u_nk, "MBAR")
df = forward_backward_convergence([gmx_benzene_Coulomb_u_nk[0]], "MBAR")
ax = plot_convergence(df)
assert isinstance(ax, matplotlib.axes.Axes)
plt.close(ax.figure)
Expand Down Expand Up @@ -241,6 +241,15 @@ def test_plot_moving_average(gmx_benzene_Coulomb_u_nk):
assert isinstance(ax, matplotlib.axes.Axes)
plt.close(ax.figure)

ax = plot_moving_average(df, units="kJ/mol")
assert isinstance(ax, matplotlib.axes.Axes)
plt.close(ax.figure)

df = df.drop("FE_Error", axis=1)
ax = plot_moving_average(df)
assert isinstance(ax, matplotlib.axes.Axes)
plt.close(ax.figure)


class Test_Units:
@staticmethod
Expand Down

0 comments on commit b5fad16

Please sign in to comment.