Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel read and preprocess the data #371

Merged
merged 15 commits into from
Oct 6, 2024
5 changes: 4 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ The rules for this file:
* release numbers follow "Semantic Versioning" https://semver.org

------------------------------------------------------------------------------
??/??/2024 orbeckst
??/??/2024 orbeckst, xiki-tempula
orbeckst marked this conversation as resolved.
Show resolved Hide resolved

* 2.3.1

Changes:
- alchemlyb adopts SPEC 0 (replaces NEP 29)
https://scientific-python.org/specs/spec-0000/

Enhancements
- Parallelise read and preprocess for ABFE workflow. (PR #371)


21/05/2024 xiki-tempula

Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ dependencies:
- pyarrow
- matplotlib
- loguru
- joblib
orbeckst marked this conversation as resolved.
Show resolved Hide resolved
89 changes: 79 additions & 10 deletions src/alchemlyb/tests/test_workflow_ABFE.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os

import numpy as np
import pandas as pd
import pytest
from alchemtest.amber import load_bace_example
from alchemtest.gmx import load_ABFE
from joblib import parallel_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just import joblib so that it's clearer below what comes from joblib? In this way any un-qualified functions and classes are alchemlyb and everything else is external.


import alchemlyb.parsing.amber
from alchemlyb.workflows.abfe import ABFE
Expand All @@ -30,6 +32,7 @@ def workflow(tmp_path_factory):
overlap="O_MBAR.pdf",
breakdown=True,
forwrev=10,
n_jobs=1,
)
return workflow

Expand Down Expand Up @@ -79,6 +82,7 @@ def test_invalid_estimator(self, workflow):
overlap=None,
breakdown=None,
forwrev=None,
n_jobs=1,
)

def test_single_estimator(self, workflow, monkeypatch):
Expand All @@ -88,15 +92,25 @@ def test_single_estimator(self, workflow, monkeypatch):
monkeypatch.setattr(workflow, "dHdl_sample_list", [])
monkeypatch.setattr(workflow, "estimator", dict())
workflow.run(
uncorr=None, estimators="MBAR", overlap=None, breakdown=True, forwrev=None
uncorr=None,
estimators="MBAR",
overlap=None,
breakdown=True,
forwrev=None,
n_jobs=1,
)
assert "MBAR" in workflow.estimator

@pytest.mark.parametrize("forwrev", [None, False, 0])
def test_no_forwrev(self, workflow, monkeypatch, forwrev):
monkeypatch.setattr(workflow, "convergence", None)
workflow.run(
uncorr=None, estimators=None, overlap=None, breakdown=None, forwrev=forwrev
uncorr=None,
estimators=None,
overlap=None,
breakdown=None,
forwrev=forwrev,
n_jobs=1,
)
assert workflow.convergence is None

Expand Down Expand Up @@ -128,7 +142,7 @@ def test_read_TI_FEP(self, workflow, monkeypatch, read_u_nk, read_dHdl):
monkeypatch.setattr(workflow, "dHdl_list", [])
monkeypatch.setattr(workflow, "u_nk_sample_list", [])
monkeypatch.setattr(workflow, "dHdl_sample_list", [])
workflow.read(read_u_nk, read_dHdl)
workflow.read(read_u_nk, read_dHdl, n_jobs=1)
if read_u_nk:
assert len(workflow.u_nk_list) == 30
else:
Expand All @@ -148,7 +162,7 @@ def extract_u_nk(self, T):

monkeypatch.setattr(workflow, "_extract_u_nk", extract_u_nk)
with pytest.raises(OSError, match=r"Error reading u_nk"):
workflow.read()
workflow.read(n_jobs=1)

def test_read_invalid_dHdl(self, workflow, monkeypatch):
monkeypatch.setattr(workflow, "u_nk_sample_list", [])
Expand All @@ -159,7 +173,7 @@ def extract_dHdl(self, T):

monkeypatch.setattr(workflow, "_extract_dHdl", extract_dHdl)
with pytest.raises(OSError, match=r"Error reading dHdl"):
workflow.read()
workflow.read(n_jobs=1)


class TestSubsample:
Expand All @@ -181,22 +195,22 @@ def test_uncorr_threshold(self, workflow, monkeypatch):
)
monkeypatch.setattr(workflow, "u_nk_sample_list", [])
monkeypatch.setattr(workflow, "dHdl_sample_list", [])
workflow.preprocess(threshold=50)
workflow.preprocess(threshold=50, n_jobs=1)
assert all([len(u_nk) == 40 for u_nk in workflow.u_nk_sample_list])
assert all([len(dHdl) == 40 for dHdl in workflow.dHdl_sample_list])

def test_no_u_nk_preprocess(self, workflow, monkeypatch):
monkeypatch.setattr(workflow, "u_nk_list", [])
monkeypatch.setattr(workflow, "u_nk_sample_list", [])
monkeypatch.setattr(workflow, "dHdl_sample_list", [])
workflow.preprocess(threshold=50)
workflow.preprocess(threshold=50, n_jobs=1)
assert len(workflow.u_nk_list) == 0

def test_no_dHdl_preprocess(self, workflow, monkeypatch):
monkeypatch.setattr(workflow, "dHdl_list", [])
monkeypatch.setattr(workflow, "u_nk_sample_list", [])
monkeypatch.setattr(workflow, "dHdl_sample_list", [])
workflow.preprocess(threshold=50)
workflow.preprocess(threshold=50, n_jobs=1)
assert len(workflow.dHdl_list) == 0


Expand Down Expand Up @@ -407,7 +421,7 @@ def workflow(tmp_path_factory):
T=298.0,
outdirectory=str(outdir),
)
workflow.read()
workflow.read(n_jobs=1)
workflow.estimate(estimators="TI")
return workflow

Expand Down Expand Up @@ -437,11 +451,66 @@ def workflow(tmp_path_factory):
T=298.0,
outdirectory=str(outdir),
)
workflow.read()
workflow.read(n_jobs=1)
workflow.estimate(estimators="BAR")
return workflow

def test_summary(self, workflow):
"""Test if if the summary is right."""
summary = workflow.generate_result()
assert np.isclose(summary["BAR"]["Stages"]["TOTAL"], 1.40405980473, 0.1)


class TestParallel:
@pytest.fixture(scope="class")
def workflow(self, tmp_path_factory):
outdir = tmp_path_factory.mktemp("out")
(outdir / "dhdl_00.xvg").symlink_to(load_ABFE()["data"]["complex"][0])
(outdir / "dhdl_01.xvg").symlink_to(load_ABFE()["data"]["complex"][1])
workflow = ABFE(
units="kcal/mol",
software="GROMACS",
dir=str(outdir),
prefix="dhdl",
suffix="xvg",
T=310,
)
workflow.read(n_jobs=1)
workflow.preprocess(n_jobs=1)
return workflow

@pytest.fixture(scope="class")
def parallel_workflow(self, tmp_path_factory):
outdir = tmp_path_factory.mktemp("out")
(outdir / "dhdl_00.xvg").symlink_to(load_ABFE()["data"]["complex"][0])
(outdir / "dhdl_01.xvg").symlink_to(load_ABFE()["data"]["complex"][1])
workflow = ABFE(
units="kcal/mol",
software="GROMACS",
dir=str(outdir),
prefix="dhdl",
suffix="xvg",
T=310,
)
with parallel_config(backend="threading"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find

Suggested change
with parallel_config(backend="threading"):
with joblib.parallel_config(backend="threading"):

clearer when quickly reading the code.

# The default backend is "loky", which is more robust but somehow didn't
# play well with pytest, but "loky" is perfectly fine outside pytest.
workflow.read(n_jobs=2)
workflow.preprocess(n_jobs=2)
return workflow

def test_read(self, workflow, parallel_workflow):
pd.testing.assert_frame_equal(
workflow.u_nk_list[0], parallel_workflow.u_nk_list[0]
)
pd.testing.assert_frame_equal(
workflow.u_nk_list[1], parallel_workflow.u_nk_list[1]
)

def test_preprocess(self, workflow, parallel_workflow):
pd.testing.assert_frame_equal(
workflow.u_nk_sample_list[0], parallel_workflow.u_nk_sample_list[0]
)
pd.testing.assert_frame_equal(
workflow.u_nk_sample_list[1], parallel_workflow.u_nk_sample_list[1]
)
Loading
Loading