From 2dcca67fb7a8d7c270a0031c2df7c018c786015b Mon Sep 17 00:00:00 2001 From: kono Date: Thu, 3 Oct 2024 17:28:16 +0000 Subject: [PATCH 1/2] mod: simulator to bidirectional --- pacs/mdrun/Cycle.py | 34 ++++-- pacs/mdrun/simulator/gromacs.py | 42 +++++--- pacs/mdrun/simulator/superSimulator.py | 50 +++++---- pacs/models/settings.py | 144 +++++++++++++++++-------- 4 files changed, 187 insertions(+), 83 deletions(-) diff --git a/pacs/mdrun/Cycle.py b/pacs/mdrun/Cycle.py index bf49066..f36007b 100644 --- a/pacs/mdrun/Cycle.py +++ b/pacs/mdrun/Cycle.py @@ -70,14 +70,26 @@ def prepare_trial(self) -> None: this is a method for preparing the trial, cycle 0 """ self.prepare_next_cycle(cycle=-1, replica=1) - extension = Path(self.settings.structure).suffix - dir = self.settings.each_replica(_cycle=0, _replica=1) - cmd_cp = f"cp {self.settings.structure} {dir}/input{extension}" + extension = Path(self.settings.structure_back).suffix + + # fore + dir = self.settings.each_replica(_cycle=0, _direction="fore", _replica=1) + cmd_cp = f"cp {self.settings.structure_fore} {dir}/input{extension}" + res_cp = subprocess.run(cmd_cp, shell=True) + if res_cp.returncode != 0: + LOGGER.error("error occurred at cp command") + LOGGER.error(f"check the authority of {dir}/") + exit(1) + + # back + dir = self.settings.each_replica(_cycle=0, _direction="back", _replica=1) + cmd_cp = f"cp {self.settings.structure_back} {dir}/input{extension}" res_cp = subprocess.run(cmd_cp, shell=True) if res_cp.returncode != 0: LOGGER.error("error occurred at cp command") LOGGER.error(f"check the authority of {dir}/") exit(1) + tmp = self.settings.n_replica self.settings.n_replica = 1 # create version file of pacstk @@ -114,14 +126,22 @@ def make_dir(path: Path) -> None: next_cycle_dir = Path(f"{self.settings.each_cycle(_cycle=cycle + 1)}") make_dir(next_cycle_dir / "summary") + for direction in ["fore", "back"]: + self.prepare_next_direction(cycle=cycle, direction=direction, replica=replica) + + def prepare_next_direction(self, cycle: int, direction: str, replica: int) -> None: + next_direction_dir = Path( + f"{self.settings.each_direction(_cycle=cycle + 1, _direction=direction)}" + ) + make_dir(next_direction_dir / "summary") for rep in range(1, replica + 1): - make_dir(next_cycle_dir / f"replica{rep:03}") - - cmd_touch = f"touch {next_cycle_dir}/summary/progress.log" + make_dir(next_direction_dir / f"replica{rep:03}") + + cmd_touch = f"touch {next_direction_dir}/summary/progress.log" res_touch = subprocess.run(cmd_touch, shell=True) if res_touch.returncode != 0: LOGGER.error("error occurred at touch command") - LOGGER.error(f"check authority of {next_cycle_dir}/summary/") + LOGGER.error(f"check authority of {next_direction_dir}/summary/") exit(1) def run_md(self) -> None: diff --git a/pacs/mdrun/simulator/gromacs.py b/pacs/mdrun/simulator/gromacs.py index a1ecebf..15e3407 100644 --- a/pacs/mdrun/simulator/gromacs.py +++ b/pacs/mdrun/simulator/gromacs.py @@ -12,9 +12,9 @@ class GROMACS(SuperSimulator): - def run_md(self, settings: MDsettings, cycle: int, replica: int) -> None: - dir = settings.each_replica(_cycle=cycle, _replica=replica) - self.run_grompp(settings, dir) + def run_md(self, settings: MDsettings, cycle: int, direction:str, replica: int) -> None: + dir = settings.each_replica(_cycle=cycle, _direction=direction, _replica=replica) + self.run_grompp(settings, dir, direction) cmd_mdrun = f"exec {settings.cmd_mpi} {settings.cmd_serial} \ -deffnm {dir}/prd 1> {dir}/mdrun.log 2>&1" # NOQA: E221 @@ -43,15 +43,29 @@ def run_md(self, settings: MDsettings, cycle: int, replica: int) -> None: LOGGER.error(f"see {dir}/mdrun.log and {dir}/prd.log") exit(1) - def run_grompp(self, settings: MDsettings, dir: str) -> None: - cmd_grompp = f"exec {settings.cmd_gmx} grompp \ - -f {settings.mdconf} \ - -o {dir}/prd.tpr \ - -p {settings.topology} \ - -c {dir}/input.gro \ - -n {settings.index_file} \ - -po {dir}/mdout.mdp \ - -maxwarn 10 1> {dir}/grompp.log 2>&1" # NOQA: E221 + def run_grompp(self, settings: MDsettings, dir: str, direction: str) -> None: + if direction == "fore": + cmd_grompp = f"exec {settings.cmd_gmx} grompp \ + -f {settings.mdconf_fore} \ + -o {dir}/prd.tpr \ + -p {settings.topology_fore} \ + -c {dir}/input.gro \ + -n {settings.index_file_fore} \ + -po {dir}/mdout.mdp \ + -maxwarn 10 1> {dir}/grompp.log 2>&1" # NOQA: E221 + elif direction == "back": + cmd_grompp = f"exec {settings.cmd_gmx} grompp \ + -f {settings.mdconf_back} \ + -o {dir}/prd.tpr \ + -p {settings.topology_back} \ + -c {dir}/input.gro \ + -n {settings.index_file_back} \ + -po {dir}/mdout.mdp \ + -maxwarn 10 1> {dir}/grompp.log 2>&1" + else: + LOGGER.error("direction must be fore or back") + exit(1) + res_grompp = subprocess.run(cmd_grompp, shell=True) if res_grompp.returncode != 0: LOGGER.error("error occurred at grompp command") @@ -59,10 +73,10 @@ def run_grompp(self, settings: MDsettings, dir: str) -> None: exit(1) def run_MPI( - self, settings: MDsettings, cycle: int, groupreplica: List[int] + self, settings: MDsettings, cycle: int, direction: str, groupreplica: List[int] ) -> None: groupdir = [ - settings.each_replica(_cycle=cycle, _replica=replica) + settings.each_replica(_cycle=cycle, _direction=direction, _replica=replica) for replica in groupreplica ] diff --git a/pacs/mdrun/simulator/superSimulator.py b/pacs/mdrun/simulator/superSimulator.py index 7180254..4dd8772 100644 --- a/pacs/mdrun/simulator/superSimulator.py +++ b/pacs/mdrun/simulator/superSimulator.py @@ -13,26 +13,30 @@ @dataclasses.dataclass class SuperSimulator(metaclass=ABCMeta): @abstractmethod - def run_md(self, settings: MDsettings, cycle: int, replica: int) -> None: + def run_md(self, settings: MDsettings, cycle: int, direction: str, replica: int) -> None: pass @abstractmethod def run_MPI( - self, settings: MDsettings, cycle: int, groupreplica: List[int] + self, settings: MDsettings, cycle: int, direction: str, groupreplica: List[int] ) -> None: pass def run_parallel(self, settings: MDsettings, cycle: int) -> None: + self.run_parallel_one_way(settings, cycle, "fore") + self.run_parallel_one_way(settings, cycle, "back") + + def run_parallel_one_way(self, settings: MDsettings, cycle: int, direction: str) -> None: if settings.n_parallel == 1 or cycle == 0: - self.run_serial(settings, cycle) + self.run_serial(settings, cycle, direction) return if settings.cmd_mpi != "": if settings.simulator == "gromacs" or settings.simulator == "amber": - self.run_parallel_MPI(settings, cycle) + self.run_parallel_MPI(settings, cycle, direction) return - not_finished_replicas = self.not_finished_replicas(settings, cycle) + not_finished_replicas = self.not_finished_replicas(settings, cycle, direction) if len(not_finished_replicas) == 0: return @@ -64,19 +68,23 @@ def run_parallel(self, settings: MDsettings, cycle: int) -> None: for replica in not_finished_replicas[ i * n_parallel : min((i + 1) * n_parallel, rest) ]: - self.record_finished(settings, cycle, replica) + self.record_finished(settings, cycle, direction, replica) def run_serial(self, settings: MDsettings, cycle: int) -> None: - not_finished_replicas = self.not_finished_replicas(settings, cycle) + self.run_serial_one_way(settings, cycle, "fore") + self.run_serial_one_way(settings, cycle, "back") + + def run_serial_one_way(self, settings: MDsettings, cycle: int, direction: str) -> None: + not_finished_replicas = self.not_finished_replicas(settings, cycle, direction) if len(not_finished_replicas) == 0: return for replica in not_finished_replicas: - self.run_md(settings, cycle, replica) - self.record_finished(settings, cycle, replica) + self.run_md(settings, cycle, direction, replica) + self.record_finished(settings, cycle, direction, replica) - def not_finished_replicas(self, settings: MDsettings, cycle: int) -> List[int]: + def not_finished_replicas(self, settings: MDsettings, cycle: int, direction: str) -> List[int]: finished_replicas = [False] * settings.n_replica - dir = settings.each_cycle(_cycle=cycle) + dir = settings.each_direction(_cycle=cycle, _direction=direction) with open(f"{dir}/summary/progress.log", "r") as f: for line in f: if "replica" not in line: @@ -89,19 +97,23 @@ def not_finished_replicas(self, settings: MDsettings, cycle: int) -> List[int]: ] return not_finished_replicas - def record_finished(self, settings: MDsettings, cycle: int, replica: int) -> None: - dir = settings.each_cycle(_cycle=cycle) + def record_finished(self, settings: MDsettings, cycle: int, direction: str, replica: int) -> None: + dir = settings.each_direction(_cycle=cycle, _direction=direction) logger = generate_logger(f"c{cycle}rep{replica}", f"{dir}/summary/progress.log") logger.info(f"replica{replica:03} done") close_logger(logger) def run_parallel_MPI(self, settings: MDsettings, cycle: int) -> None: - not_finished_replicas = self.not_finished_replicas(settings, cycle) + self.run_parallel_MPI_one_way(settings, cycle, "fore") + self.run_parallel_MPI_one_way(settings, cycle, "back") + + def run_parallel_MPI_one_way(self, settings: MDsettings, cycle: int, direction: str) -> None: + not_finished_replicas = self.not_finished_replicas(settings, cycle, direction) if len(not_finished_replicas) == 0: return if len(not_finished_replicas) == 1: - self.run_md(settings, cycle, not_finished_replicas[0]) - self.record_finished(settings, cycle, not_finished_replicas[0]) + self.run_md(settings, cycle, direction, not_finished_replicas[0]) + self.record_finished(settings, cycle, direction, not_finished_replicas[0]) return n_parallel = settings.n_parallel @@ -114,8 +126,8 @@ def run_parallel_MPI(self, settings: MDsettings, cycle: int) -> None: ]: groupreplica.append(replica) if len(groupreplica) != n_parallel: - self.run_serial(settings, cycle) + self.run_serial(settings, cycle, direction) else: - self.run_MPI(settings, cycle, groupreplica) + self.run_MPI(settings, cycle, direction, groupreplica) for replica in groupreplica: - self.record_finished(settings, cycle, replica) + self.record_finished(settings, cycle, direction, replica) diff --git a/pacs/models/settings.py b/pacs/models/settings.py index df53a41..b52492c 100644 --- a/pacs/models/settings.py +++ b/pacs/models/settings.py @@ -53,10 +53,16 @@ class MDsettings: # simulator simulator: str = None - structure: Path = None - topology: Path = None - mdconf: Path = None - index_file: Path = None + structure_fore: Path = None + topology_fore: Path = None + mdconf_fore: Path = None + index_file_fore: Path = None + + structure_back: Path = None + topology_back: Path = None + mdconf_back: Path = None + index_file_back: Path = None + trajectory_extension: str = None cmd_mpi: str = "" cmd_parallel: str = None @@ -67,7 +73,6 @@ class MDsettings: type: str = None threshold: float = None skip_frame: int = 1 - reference: Path = None selection1: str = None selection2: str = None selection3: str = None @@ -83,7 +88,8 @@ class MDsettings: # rmmol option rmmol: bool = False - keep_selection: str = None + keep_selection_fore: str = None + keep_selection_back: str = None # rmfile option rmfile: bool = False @@ -92,18 +98,35 @@ class MDsettings: nojump: bool = False def each_replica( - self, _trial: int = None, _cycle: int = None, _replica: int = None + self, _trial: int = None, _cycle: int = None, _direction: str = None, _replica: int = None ) -> str: if _trial is None: _trial = self.trial if _cycle is None: _cycle = self.max_cycle + if _direction is None: + _direction = self.direction if _replica is None: _replica = self.n_replica + if _direction not in ["fore", "back"]: + LOGGER.error(f"direction={_direction} is given.") + exit(1) return ( - f"{self.working_dir}/trial{_trial:03}/cycle{_cycle:03}/replica{_replica:03}" + f"{self.working_dir}/trial{_trial:03}/cycle{_cycle:03}/{_direction}/replica{_replica:03}" ) + def each_direction(self, _trial: int = None, _cycle: int = None, _direction: str = None) -> str: + if _trial is None: + _trial = self.trial + if _cycle is None: + _cycle = self.cycle + if _direction is None: + _direction = self.direction + if _direction not in ["fore", "back"]: + LOGGER.error(f"direction={_direction} is given.") + exit(1) + return f"{self.working_dir}/trial{_trial:03}/cycle{_cycle:03}/{_direction}" + def each_cycle(self, _trial: int = None, _cycle: int = None) -> str: if _trial is None: _trial = self.trial @@ -119,8 +142,8 @@ def each_trial(self, _trial: int = None) -> str: def log_file(self) -> str: return f"{self.working_dir}/trial{self.trial:03}.log" - def rmmol_top(self) -> None: - c0r1_dir = self.each_replica(_cycle=0, _replica=1) + def rmmol_top(self, direction: str) -> None: + c0r1_dir = self.each_replica(_cycle=0, _direction=direction, _replica=1) self.top_mdtraj = f"{c0r1_dir}/rmmol_top.pdb" def check_bool(self, value: str) -> bool: @@ -132,9 +155,9 @@ def check_bool(self, value: str) -> bool: def update(self): if self.selection3 is None: - self.selection3 = self.selection1 + LOGGER.error("selection3 is None. Please set selection3.") if self.selection4 is None: - self.selection4 = self.selection2 + LOGGER.error("selection4 is None. Please set selection4.") if self.cmd_parallel is None: if self.n_parallel > 1: LOGGER.warning( @@ -199,13 +222,13 @@ def check(self) -> None: exit(1) if self.type not in [ - "target", - "association", - "dissociation", - "rmsd", - "ee", - "a_d", - "template", + "bd_rmsd", + # "association", + # "dissociation", + # "rmsd", + # "ee", + # "a_d", + # "template", ]: LOGGER.error(f"{self.type} is not supported") exit(1) @@ -227,10 +250,11 @@ def check(self) -> None: # threshold check if self.threshold is None and self.type in [ - "target", - "association", - "dissociation", - "rmsd", + "bd_rmsd", + # "target", + # "association", + # "dissociation", + # "rmsd", ]: LOGGER.error(f"{self.type} requires threshold") exit(1) @@ -270,30 +294,57 @@ def check(self) -> None: LOGGER.error("selection2 is required for dissociation") exit(1) - self.update() + self.update() # update selection3, 4 + + if self.topology_fore is None: + LOGGER.error("topology_fore is None") + exit(1) + if self.structure_fore is None: + LOGGER.error("structure_fore is None") + exit(1) + if self.mdconf_fore is None: + LOGGER.error("mdconf_fore is None") + exit(1) - if self.topology is None: - LOGGER.error("topology is None") + if self.topology_back is None: + LOGGER.error("topology_back is None") + exit(1) + if self.structure_back is None: + LOGGER.error("structure_back is None") exit(1) - if self.structure is None: - LOGGER.error("structure is None") + if self.mdconf_back is None: + LOGGER.error("mdconf_back is None") exit(1) - if self.mdconf is None: - LOGGER.error("mdconf is None") + + if self.working_dir is None: + LOGGER.error("working_dir is None") exit(1) if self.trajectory_extension is None: LOGGER.error("trajectory_extension is None") exit(1) - self.structure = Path(self.structure) - self.topology = Path(self.topology) - self.mdconf = Path(self.mdconf) + + self.structure_fore = Path(self.structure_fore) + self.topology_fore = Path(self.topology_fore) + self.mdconf_fore = Path(self.mdconf_fore) + + self.structure_back = Path(self.structure_back) + self.topology_back = Path(self.topology_back) + self.mdconf_back = Path(self.mdconf_back) + self.working_dir = Path(self.working_dir) if self.simulator == "gromacs": - self.index_file = Path(self.index_file) + if self.index_file_fore is None: + LOGGER.error("index_file_fore is None") + exit(1) + if self.index_file_back is None: + LOGGER.error("index_file_back is None") + exit(1) + self.index_file_fore = Path(self.index_file_fore) + self.index_file_back = Path(self.index_file_back) # analyzer - if self.type in ["target", "rmsd"]: - self.reference = Path(self.reference) + # if self.type in ["bd_rmsd"]: + # self.reference = Path(self.reference) # relative path to absolute path """ @@ -314,8 +365,8 @@ def check(self) -> None: self.cmd_gmx = re.findall(r"\S+", self.cmd_serial)[0] # rmmol - if self.rmmol and self.keep_selection is None: - LOGGER.error("keep_selection is necessary if rmmol is True") + if self.rmmol and (self.keep_selection_fore is None or self.keep_selection_back is None): + LOGGER.error("keep_selection_[fore/back] are necessary if rmmol is True") exit(1) # top_mdtraj @@ -375,11 +426,16 @@ def check(self) -> None: ".dtr", ".gsd", ] - tmp = self.structure.suffix - if tmp in init_structure_extension: - self.structure_extension = tmp + tmp_fore = self.structure_fore.suffix + tmp_back = self.structure_back.suffix + if tmp_fore != tmp_back: + # for simplicity, the extension of the structure file must be the same + LOGGER.error("structure_fore and structure_back must have the same extension.") + exit(1) + if tmp_back in init_structure_extension: + self.structure_extension = tmp_back else: - LOGGER.error("cannot start PaCSMD with this STRUCTURE.") + LOGGER.error(f"cannot start PaCSMD with this STRUCTURE with the extension {tmp_back}") exit(1) @@ -389,12 +445,14 @@ class Snapshot: information about a snapshot of a trajectory Attributes: + direction (str): direction of simulation [fore, back] replica (int): id of replica frame (int): id of frame cv (float or List[float]): value of collective variable """ - def __init__(self, replica: int, frame: int, cv: float): + def __init__(self, direction: str, replica: int, frame: int, cv: float): + self.direction = direction self.replica = replica self.frame = frame self.cv = cv From 210c770c8413f8521ad983ad194ce46cdbcf6420 Mon Sep 17 00:00:00 2001 From: kono Date: Fri, 4 Oct 2024 16:41:00 +0000 Subject: [PATCH 2/2] update analyzer, exporter to bidirectional --- pacs/__main__.py | 26 +-- pacs/mdrun/Cycle.py | 4 + pacs/mdrun/analyzer/bd_rmsd.py | 107 ++++++++++ pacs/mdrun/analyzer/superAnalyzer.py | 240 ++++++++++++++++----- pacs/mdrun/exporter/gromacs.py | 282 +++++++++++++------------ pacs/mdrun/exporter/superExporter.py | 6 +- pacs/mdrun/simulator/superSimulator.py | 4 +- pacs/models/settings.py | 86 ++++++-- 8 files changed, 529 insertions(+), 226 deletions(-) create mode 100644 pacs/mdrun/analyzer/bd_rmsd.py diff --git a/pacs/__main__.py b/pacs/__main__.py index 5a8001a..3c4cc26 100644 --- a/pacs/__main__.py +++ b/pacs/__main__.py @@ -1,11 +1,12 @@ from typing import Tuple from ._version import __version__ -from .mdrun.analyzer.a_d import A_D -from .mdrun.analyzer.association import Association -from .mdrun.analyzer.dissociation import Dissociation -from .mdrun.analyzer.ee import EdgeExpansion -from .mdrun.analyzer.rmsd import RMSD +# from .mdrun.analyzer.a_d import A_D +# from .mdrun.analyzer.association import Association +# from .mdrun.analyzer.dissociation import Dissociation +# from .mdrun.analyzer.ee import EdgeExpansion +# from .mdrun.analyzer.rmsd import RMSD +from .mdrun.analyzer.bd_rmsd import BD_RMSD from .mdrun.analyzer.superAnalyzer import SuperAnalyzer from .mdrun.analyzer.target import Target from .mdrun.analyzer.template import Template @@ -40,13 +41,14 @@ def prepare_md( }.get(settings.simulator) analyzer: SuperAnalyzer = { - "target": Target(), - "dissociation": Dissociation(), - "association": Association(), - "rmsd": RMSD(), - "ee": EdgeExpansion(), - "a_d": A_D(), - "template": Template(), + "bd_rmsd": BD_RMSD(), + # "target": Target(), + # "dissociation": Dissociation(), + # "association": Association(), + # "rmsd": RMSD(), + # "ee": EdgeExpansion(), + # "a_d": A_D(), + # "template": Template(), }.get(settings.type) exporter: SuperExporter = { diff --git a/pacs/mdrun/Cycle.py b/pacs/mdrun/Cycle.py index f36007b..769fe20 100644 --- a/pacs/mdrun/Cycle.py +++ b/pacs/mdrun/Cycle.py @@ -130,6 +130,10 @@ def make_dir(path: Path) -> None: self.prepare_next_direction(cycle=cycle, direction=direction, replica=replica) def prepare_next_direction(self, cycle: int, direction: str, replica: int) -> None: + def make_dir(path: Path) -> None: + if not path.exists(): + path.mkdir(parents=True) + next_direction_dir = Path( f"{self.settings.each_direction(_cycle=cycle + 1, _direction=direction)}" ) diff --git a/pacs/mdrun/analyzer/bd_rmsd.py b/pacs/mdrun/analyzer/bd_rmsd.py new file mode 100644 index 0000000..889788d --- /dev/null +++ b/pacs/mdrun/analyzer/bd_rmsd.py @@ -0,0 +1,107 @@ +import subprocess +from typing import List + +import numpy as np +from pacs.mdrun.analyzer.superAnalyzer import SuperAnalyzer +from pacs.models.settings import MDsettings, Snapshot, ScoresInCycle, ScoresInOnePair +from pacs.utils.logger import generate_logger + +LOGGER = generate_logger(__name__) + + +class BD_RMSD(SuperAnalyzer): + def calculate_scores_in_one_pair( + self, settings: MDsettings, cycle: int, fore_replica: int, back_replica: int, send_rev + ) -> ScoresInOnePair: + if settings.analyzer == "mdtraj": + ret = self.cal_by_mdtraj(settings, cycle, fore_replica, back_replica) + elif settings.analyzer == "gromacs": + raise NotImplementedError + elif settings.analyzer == "cpptraj": + raise NotImplementedError + else: + raise NotImplementedError + + LOGGER.info(f"cycle {cycle} replica ({fore_replica}, {back_replica}) calculated.") + if send_rev is not None: + send_rev.send(ret) + return ret + + def aggregate(self, settings: MDsettings, scores_in_cycle: ScoresInCycle, direction: str) -> np.ndarray: + aggr_func = np.mean + + if direction == "fore": + return aggr_func(scores_in_cycle.cv_data, axis=(1, 3)) + elif direction == "back": + return aggr_func(scores_in_cycle.cv_data, axis=(0, 2)) + else: + LOGGER.error(f"invalid direction: {direction}") + + def ranking(self, settings: MDsettings, CVs: List[Snapshot]) -> List[Snapshot]: + sorted_cv = sorted(CVs, key=lambda x: x.cv, reverse=False) + return sorted_cv + + def is_threshold(self, settings: MDsettings, CVs: List[Snapshot] = None) -> bool: + if CVs is None: + CVs = self.CVs + return CVs[0].cv > settings.threshold + + def cal_by_mdtraj(self, settings: MDsettings, cycle: int, fore_replica: int, back_replica: int) -> ScoresInOnePair: + import mdtraj as md + + fore_dir = settings.each_replica(_cycle=cycle, _direction="fore", _replica=fore_replica) + back_dir = settings.each_replica(_cycle=cycle, _direction="back", _replica=back_replica) + fore_trj = md.load( + f"{fore_dir}/prd{settings.trajectory_extension}", + top=settings.top_mdtraj_fore, + ) + back_trj = md.load( + f"{back_dir}/prd{settings.trajectory_extension}", + top=settings.top_mdtraj_back, + ) + + # slice (exclude the first frame for gromacs) + if settings.simulator == "gromacs": + n_frames_fore = fore_trj.n_frames + n_frames_back = back_trj.n_frames + fore_trj = fore_trj.slice(np.arange(1, n_frames_fore)) + back_trj = back_trj.slice(np.arange(1, n_frames_back)) + n_frames_fore = fore_trj.n_frames + n_frames_back = back_trj.n_frames + + # fit and compute rmsd + rmsd = np.zeros((n_frames_fore, n_frames_back)) + sel1_arr = fore_trj.top.select(settings.selection1) + sel2_arr = fore_trj.top.select(settings.selection2) + sel3_arr = back_trj.top.select(settings.selection3) + sel4_arr = back_trj.top.select(settings.selection4) + + for frame_i_back in range(n_frames_back): + back_trj.superpose( + fore_trj, + frame_i_back, + atom_indices=sel1_arr, + ref_atom_indices=sel3_arr, + ) + # もうちょい効率化できるかも + for frame_i_fore in range(n_frames_fore): + tmp = np.sqrt( + 3 + * np.mean( + np.square( + fore_trj.xyz[frame_i_fore, sel2_arr] + - back_trj.xyz[frame_i_back, sel4_arr] + ), + axis=(0, 1), # xyzのみで平均を取る + ) + ) + rmsd[frame_i_fore, frame_i_back] = tmp + + scores_in_one_pair = ScoresInOnePair( + cycle=cycle, + fore_replica=fore_replica, + back_replica=back_replica, + cv_data=rmsd, + ) + + return scores_in_one_pair \ No newline at end of file diff --git a/pacs/mdrun/analyzer/superAnalyzer.py b/pacs/mdrun/analyzer/superAnalyzer.py index 447efa1..73583fd 100644 --- a/pacs/mdrun/analyzer/superAnalyzer.py +++ b/pacs/mdrun/analyzer/superAnalyzer.py @@ -6,7 +6,7 @@ from typing import List import numpy as np -from pacs.models.settings import MDsettings, Snapshot +from pacs.models.settings import MDsettings, Snapshot, ScoresInCycle, ScoresInOnePair from pacs.utils.logger import generate_logger LOGGER = generate_logger(__name__) @@ -18,7 +18,13 @@ class SuperAnalyzer(metaclass=ABCMeta): CVs: List[float] = None @abstractmethod - def calculate_cv(self, settings: MDsettings, cycle: int) -> List[float]: + def calculate_scores_in_one_pair( + self, settings: MDsettings, cycle: int, fore_replica: int, back_replica: int, send_rev + ) -> ScoresInOnePair: + pass + + @abstractmethod + def aggregate(self, settings: MDsettings, scores_in_cycle: ScoresInCycle, direction: str) -> np.ndarray: pass @abstractmethod @@ -30,55 +36,61 @@ def is_threshold(self, settings: MDsettings, CVs: List[Snapshot] = None) -> bool pass def write_cv_to_file( - self, output_file: str, cv_arr: List[Snapshot], settings: MDsettings, cycle: int + self, output_file: str, cv_arr: List[Snapshot], settings: MDsettings, cycle: int, direction: str ) -> None: - dir = settings.each_cycle(_cycle=cycle) + dir = settings.each_direction(_cycle=cycle, _direction=direction) with open(f"{dir}/summary/{output_file}", "w") as f: for snapshot in cv_arr: f.write(f"{snapshot}" + "\n") def analyze(self, settings: MDsettings, cycle: int) -> List[Snapshot]: - dir = settings.each_cycle(_cycle=cycle) - if Path(f"{dir}/summary/cv_ranked.log").exists(): - pattern1 = r"replica (\d+) frame (\d+) cv \[([-\d.\s]+)\]" - pattern2 = r"replica (\d+) frame (\d+) cv ([-\d.]+)" - results = [] - with open(f"{dir}/summary/cv_ranked.log", "r") as f: - for line in f: - match1 = re.search(pattern1, line) - match2 = re.search(pattern2, line) - if match1: - replica = int(match1.group(1)) - frame = int(match1.group(2)) - cv_values = [float(x) for x in match1.group(3).split()] - elif match2: - replica = int(match2.group(1)) - frame = int(match2.group(2)) - cv_values = float(match2.group(3)) - else: - LOGGER.error( - "pattern matching failed. please check the log file." - ) - exit(1) - - results.append(Snapshot(int(replica), int(frame), cv_values)) - LOGGER.info("analyzer was skipped") - return results - + # call rust functios here + """ + 1. extract the fitting/rmsd part and pass to Rust (fore/back) (remove the center of mass of fitting part by transposing the coordinates) + 2. call rust function to calculate the rmsd matrices for each (fore, back) replica pairs and store them in the disk (shape = (n_frames, n_frames)) + 3. read the results from the disk an create a large matrix of rmsd values between all snapshots (shape = (fore_n_replicas, back_n_replicas, n_frames, n_frames)) + 4. 全体の行列をもとに、fore/backの各snapshotのrmsdの平均を計算 + + 1, 2はセットとして、並列化できるようにしておく + cvをバイナリとしてcycle***/summary/以下に書くのはstep123, テキストとしてcycle***/[fore/back]/summaryに書くのはstep4 + 他の評価関数の場合にも使えるよう、一般化しておく(これらはsuperAnalyzer内に書く) + - あるfore・backのreplicaの組み合わせに対して、そのreplicaの各snapshotの評価関数の値を計算した結果をファイルに書く関数 + - その関数を全ペアに対して回す関数 + - レプリカペアごとになっている.npyファイルを読み込んで、全体の行列を作る関数 + 各評価関数ごとにファイル分けして書くべきこと + - あるfore・backのreplicaの組み合わせに対して、そのreplicaの各snapshotの評価関数の値を計算し、arrで返す関数(rust) + - 全体の行列をもとに、各replicaの各snapshotの評価関数の平均を計算する関数 + """ + + replica_pairs = [ + (fore_replica, back_replica) + for fore_replica in range(1, settings.n_replica+1) + for back_replica in range(1, settings.n_replica+1) + ] + + # calc for the first replica pair + fore_replica, back_replica = replica_pairs[0] + scores_in_pair = self.calculate_scores_in_one_pair(settings, cycle, fore_replica, back_replica, None) + scores_in_cycle = ScoresInCycle(cycle, settings.n_replica, scores_in_pair.n_frames_fore, scores_in_pair.n_frames_back) + scores_in_cycle.add(scores_in_pair) + n_frames_fore = scores_in_pair.n_frames_fore + n_frame_back = scores_in_pair.n_frames_back + replica_pairs = replica_pairs[1:] + + # calc for the rest replica pairs pipe_list = [] - n_loop = (settings.n_replica + settings.n_parallel - 1) // settings.n_parallel - replicas = [x + 1 for x in range(settings.n_replica)] + n_loop = (len(replica_pairs) + settings.n_parallel - 1) // settings.n_parallel + for i in range(n_loop): job_list = [] - for replica in replicas[ - i - * settings.n_parallel : min( - (i + 1) * settings.n_parallel, settings.n_replica + for fore_replica, back_replica in replica_pairs[ + i * settings.n_parallel : min( + (i + 1) * settings.n_parallel, len(replica_pairs) ) ]: get_rev, send_rev = mp.Pipe(False) p = mp.Process( - target=self.calculate_cv, args=(settings, cycle, replica, send_rev) + target=self.calculate_scores_in_one_pair, args=(settings, cycle, fore_replica, back_replica, send_rev) ) job_list.append(p) pipe_list.append(get_rev) @@ -93,32 +105,144 @@ def analyze(self, settings: MDsettings, cycle: int) -> List[Snapshot]: for proc in job_list: proc.close() - cv_arr = [x.recv() for x in pipe_list] - assert len(cv_arr) == settings.n_replica + for pipe in pipe_list: + scores_in_pair = pipe.recv() + scores_in_cycle.add(scores_in_pair) + + # save the scores_in_cycle to the disk + path = f"{settings.each_cycle(_cycle=cycle)}/summary/scores_in_cycle.npy" + scores_in_cycle.save(path) + + # aggregate the scores_in_cycle to the CVs + cv_arr_fore = self.aggregate(settings, scores_in_cycle, "fore") + cv_arr_back = self.aggregate(settings, scores_in_cycle, "back") - cv_arr = np.array(cv_arr) - if len(cv_arr.shape) == 2: - cv_arr = cv_arr.flatten() - else: - cv_arr = cv_arr.reshape(-1, cv_arr.shape[-1]) - n_frame = len(cv_arr) // settings.n_replica + results_fore: List[Snapshot] = [] + for rep in range(1, settings.n_replica + 1): + for frame in range(0, n_frames_fore): + # Exclude the first frame of the trajectory file in gromacs + # because it is the initial structure + if settings.simulator == "gromacs" and frame == 0: + continue + snapshot = Snapshot("fore", rep, frame, cv_arr_fore[rep-1, frame]) + results_fore.append(snapshot) - results: List[Snapshot] = [] - iter = 0 + results_back: List[Snapshot] = [] for rep in range(1, settings.n_replica + 1): - for frame in range(0, n_frame): - snapshot = Snapshot(rep, frame, cv_arr[iter]) - iter += 1 + for frame in range(0, n_frame_back): # Exclude the first frame of the trajectory file in gromacs # because it is the initial structure if settings.simulator == "gromacs" and frame == 0: continue - results.append(snapshot) + snapshot = Snapshot("back", rep, frame, cv_arr_back[rep-1, frame]) + results_back.append(snapshot) + + self.write_cv_to_file("cv.log", results_fore, settings, cycle, "fore") + self.write_cv_to_file("cv.log", results_back, settings, cycle, "back") + + self.fore_CVs = results_fore[:: settings.skip_frame] + self.back_CVs = results_back[:: settings.skip_frame] + + self.fore_CVs = self.ranking(settings, self.fore_CVs) + self.back_CVs = self.ranking(settings, self.back_CVs) + + self.write_cv_to_file("cv_ranked.log", self.fore_CVs, settings, cycle, "fore") + self.write_cv_to_file("cv_ranked.log", self.back_CVs, settings, cycle, "back") + + LOGGER.info(f"The top ranking CV forward is {self.fore_CVs[0]}") + LOGGER.info(f"The top ranking CV backward is {self.back_CVs[0]}") + + return self.fore_CVs, self.back_CVs + + + # def analyze(self, settings: MDsettings, cycle: int) -> List[Snapshot]: + # self.analyze(settings, cycle, "fore") + # self.analyze(settings, cycle, "back") + + # def analyze_one_way(self, settings: MDsettings, cycle: int, direction: str) -> List[Snapshot]: + # dir = settings.each_direction(_cycle=cycle, _direction=direction) + # if Path(f"{dir}/summary/cv_ranked.log").exists(): + # pattern1 = r"replica (\d+) frame (\d+) cv \[([-\d.\s]+)\]" + # pattern2 = r"replica (\d+) frame (\d+) cv ([-\d.]+)" + # results = [] + # with open(f"{dir}/summary/cv_ranked.log", "r") as f: + # for line in f: + # match1 = re.search(pattern1, line) + # match2 = re.search(pattern2, line) + # if match1: + # replica = int(match1.group(1)) + # frame = int(match1.group(2)) + # cv_values = [float(x) for x in match1.group(3).split()] + # elif match2: + # replica = int(match2.group(1)) + # frame = int(match2.group(2)) + # cv_values = float(match2.group(3)) + # else: + # LOGGER.error( + # "pattern matching failed. please check the log file." + # ) + # exit(1) + + # results.append(Snapshot(direction, int(replica), int(frame), cv_values)) + # LOGGER.info("analyzer was skipped") + # return results + + # pipe_list = [] + # n_loop = (settings.n_replica + settings.n_parallel - 1) // settings.n_parallel + # replicas = [x + 1 for x in range(settings.n_replica)] + # for i in range(n_loop): + # job_list = [] + # for replica in replicas[ + # i + # * settings.n_parallel : min( + # (i + 1) * settings.n_parallel, settings.n_replica + # ) + # ]: + # get_rev, send_rev = mp.Pipe(False) + # p = mp.Process( + # target=self.calculate_cv, args=(settings, cycle, replica, send_rev) + # ) + # job_list.append(p) + # pipe_list.append(get_rev) + # p.start() + # for proc in job_list: + # proc.join() + # for proc in job_list: + # if proc.exitcode != 0: + # LOGGER.error("error occurred at child process") + # exit(1) + # # Not necessary, but just in case. + # for proc in job_list: + # proc.close() + + # cv_arr = [x.recv() for x in pipe_list] + # assert len(cv_arr) == settings.n_replica + + # cv_arr = np.array(cv_arr) + # if len(cv_arr.shape) == 2: + # cv_arr = cv_arr.flatten() + # else: + # cv_arr = cv_arr.reshape(-1, cv_arr.shape[-1]) + # n_frame = len(cv_arr) // settings.n_replica + + # results: List[Snapshot] = [] + # iter = 0 + # for rep in range(1, settings.n_replica + 1): + # for frame in range(0, n_frame): + # snapshot = Snapshot(rep, frame, cv_arr[iter]) + # iter += 1 + # # Exclude the first frame of the trajectory file in gromacs + # # because it is the initial structure + # if settings.simulator == "gromacs" and frame == 0: + # continue + # results.append(snapshot) + + # self.write_cv_to_file("cv.log", results, settings, cycle) + # self.CVs = results[:: settings.skip_frame] + # self.CVs = self.ranking(settings, self.CVs) + # self.write_cv_to_file("cv_ranked.log", self.CVs, settings, cycle) + # LOGGER.info(f"The top ranking CV is {self.CVs[0]}") + + # return self.CVs - self.write_cv_to_file("cv.log", results, settings, cycle) - self.CVs = results[:: settings.skip_frame] - self.CVs = self.ranking(settings, self.CVs) - self.write_cv_to_file("cv_ranked.log", self.CVs, settings, cycle) - LOGGER.info(f"The top ranking CV is {self.CVs[0]}") - return self.CVs diff --git a/pacs/mdrun/exporter/gromacs.py b/pacs/mdrun/exporter/gromacs.py index 9d7f4e6..3d23a00 100644 --- a/pacs/mdrun/exporter/gromacs.py +++ b/pacs/mdrun/exporter/gromacs.py @@ -14,7 +14,7 @@ @dataclasses.dataclass class eGromacs(SuperExporter): def export(self, settings: MDsettings, cycle: int) -> None: - if cycle == 0: + if cycle == 0 and settings.analyzer == "gromacs": self.frame_to_time(settings) super().export(settings, cycle) @@ -28,7 +28,8 @@ def export_each( if settings.analyzer == "mdtraj": self.export_by_mdtraj(settings, cycle, replica_rank, results) elif settings.analyzer == "gromacs": - self.export_by_gmx(settings, cycle, replica_rank, results) + # self.export_by_gmx(settings, cycle, replica_rank, results) + raise NotImplementedError else: raise NotImplementedError @@ -38,12 +39,23 @@ def export_by_mdtraj( cycle: int, replica_rank: int, results: List[Snapshot], + ) -> None: + self.export_by_mdtraj_one_way(settings, cycle, "fore", replica_rank, results) + self.export_by_mdtraj_one_way(settings, cycle, "back", replica_rank, results) + + def export_by_mdtraj_one_way( + self, + settings: MDsettings, + cycle: int, + direction: str, + replica_rank: int, + results: List[Snapshot], ) -> None: import mdtraj as md extension = settings.trajectory_extension - from_dir = settings.each_replica( - _cycle=cycle, _replica=results[replica_rank].replica + from_dir = settings.each_direction( + _cycle=cycle, _direction=direction, _replica=results[replica_rank].replica ) selected_frame = md.load_frame( f"{from_dir}/prd{extension}", @@ -51,7 +63,9 @@ def export_by_mdtraj( top=settings.top_mdtraj, ) - out_dir = settings.each_replica(_cycle=cycle + 1, _replica=replica_rank + 1) + out_dir = settings.each_direction( + _cycle=cycle + 1, _direction=direction, _replica=replica_rank + 1 + ) if settings.centering: atom_indices = selected_frame.topology.select(settings.centering_selection) anchors = [ @@ -65,132 +79,132 @@ def export_by_mdtraj( selected_frame.image_molecules(anchor_molecules=anchors, inplace=True) selected_frame.save(f"{out_dir}/input{settings.structure_extension}") - def export_by_gmx( - self, - settings: MDsettings, - cycle: int, - replica_rank: int, - _results: List[Snapshot], - ) -> None: - f2t_dict = self.load_frame_to_time(settings) - - results = [] - dir = settings.each_cycle(_cycle=cycle) - with open(f"{dir}/summary/cv_ranked.log", "r") as f: - for line in f: - _, replica, _, frame, _, _cv = line.split() - results.append(Snapshot(int(replica), int(frame), _cv)) - - for i in range(len(results)): - results[i].frame = f2t_dict[results[i].frame] - - if settings.n_replica > len(results): - LOGGER.error( - f"The total number of frames now is {len(results)}. " - f"This is less than the number of replicas {settings.n_replica}" - ) - exit(1) - - extension = settings.trajectory_extension - from_dir = settings.each_replica( - _cycle=cycle, _replica=results[replica_rank].replica - ) - out_dir = settings.each_replica(_cycle=cycle + 1, _replica=replica_rank + 1) - - # treatment for centering option - if settings.centering is True: - centering_option = "-center" - args_to_trjconv = f"{settings.centering_selection} System" - else: - centering_option = "" - args_to_trjconv = "System" - - # treatment for nojump option - if settings.nojump is True: - pbc_option = "-pbc nojump" - else: - pbc_option = "-pbc whole" - - # First convert trj with -pbc option, and then extract with -b, -e options - # If we do the both at the same time, the -pbc nojump does not work properly - # Output the prd_image_prev_cycle to the next cycle to avoid overwrapping. - cmd_trjconv = f"echo {args_to_trjconv} \ - | {settings.cmd_gmx} trjconv \ - -f {from_dir}/prd{extension} \ - -o {out_dir}/prd_image_prev_cycle{extension} \ - -s {from_dir}/prd.tpr \ - -n {settings.index_file} \ - {pbc_option} \ - {centering_option} \ - 1> {from_dir}/trjconv.log 2>&1" # NOQA: E221 - - res_trjconv = subprocess.run(cmd_trjconv, shell=True) - - if res_trjconv.returncode != 0: - LOGGER.error("error occurred at trjconv command") - LOGGER.error(f"see {from_dir}/trjconv.log") - exit(1) - - cmd_extract = f"echo System \ - | {settings.cmd_gmx} trjconv \ - -f {out_dir}/prd_image_prev_cycle{extension} \ - -o {out_dir}/input{settings.structure_extension} \ - -s {from_dir}/prd.tpr \ - -b {results[replica_rank].frame} \ - -e {results[replica_rank].frame} \ - -novel \ - 1> {from_dir}/extract.log 2>&1" # NOQA: E221 - - res_extract = subprocess.run(cmd_extract, shell=True) - - if res_extract.returncode != 0: - LOGGER.error("error occurred at extract command") - LOGGER.error(f"see {from_dir}/extract.log") - exit(1) - - # remove the intermediate trajectory - res_rm = subprocess.run( - f"rm {out_dir}/prd_image_prev_cycle{extension}", shell=True - ) - if res_rm.returncode != 0: - LOGGER.error("error occurred at rm command") - exit(1) - - def frame_to_time(self, settings: MDsettings) -> None: - # Output correspondence between frame and time to file - dir_0_1 = settings.each_replica(_cycle=0, _replica=1) - if Path(f"{dir_0_1}/frame_time.tsv").exists(): - return - cmd_pseudo_rms = f"echo 0 0 \ - | {settings.cmd_gmx} rms \ - -f {dir_0_1}/prd{settings.trajectory_extension} \ - -s {dir_0_1}/prd.gro \ - -o {dir_0_1}/pseudo_rms.xvg \ - -nomw \ - -xvg none 1> {dir_0_1}/pseudo_rms.log 2>&1" # NOQA: E221 - res_pseudo_rms = subprocess.run(cmd_pseudo_rms, shell=True) - if res_pseudo_rms.returncode != 0: - LOGGER.error("error occurred at pseudo rms command") - LOGGER.error(f"see {dir_0_1}/pseudo_rms.log") - exit(1) - time_data = np.genfromtxt(f"{dir_0_1}/pseudo_rms.xvg", usecols=0) - frame_data = np.arange(len(time_data)) - frame_time_data = np.array([frame_data, time_data]).reshape(2, -1).T - np.savetxt( - f"{dir_0_1}/frame_time.tsv", - frame_time_data, - fmt=("%d", "%.3f"), - header="frame time(ps)", - delimiter="\t", - ) - - def load_frame_to_time(self, settings: MDsettings) -> Dict[int, float]: - # Read correspondence between frame and time from file - dir_0_1 = settings.each_replica(_cycle=0, _replica=1) - frame_time_array = np.loadtxt( - f"{dir_0_1}/frame_time.tsv", delimiter="\t", skiprows=1 - ) - frame_time_dict: Dict[int, float] = {} - for frame, time in frame_time_array: - frame_time_dict[int(frame)] = time - return frame_time_dict + # def export_by_gmx( + # self, + # settings: MDsettings, + # cycle: int, + # replica_rank: int, + # _results: List[Snapshot], + # ) -> None: + # f2t_dict = self.load_frame_to_time(settings) + + # results = [] + # dir = settings.each_cycle(_cycle=cycle) + # with open(f"{dir}/summary/cv_ranked.log", "r") as f: + # for line in f: + # _, replica, _, frame, _, _cv = line.split() + # results.append(Snapshot(int(replica), int(frame), _cv)) + + # for i in range(len(results)): + # results[i].frame = f2t_dict[results[i].frame] + + # if settings.n_replica > len(results): + # LOGGER.error( + # f"The total number of frames now is {len(results)}. " + # f"This is less than the number of replicas {settings.n_replica}" + # ) + # exit(1) + + # extension = settings.trajectory_extension + # from_dir = settings.each_replica( + # _cycle=cycle, _replica=results[replica_rank].replica + # ) + # out_dir = settings.each_replica(_cycle=cycle + 1, _replica=replica_rank + 1) + + # # treatment for centering option + # if settings.centering is True: + # centering_option = "-center" + # args_to_trjconv = f"{settings.centering_selection} System" + # else: + # centering_option = "" + # args_to_trjconv = "System" + + # # treatment for nojump option + # if settings.nojump is True: + # pbc_option = "-pbc nojump" + # else: + # pbc_option = "-pbc whole" + + # # First convert trj with -pbc option, and then extract with -b, -e options + # # If we do the both at the same time, the -pbc nojump does not work properly + # # Output the prd_image_prev_cycle to the next cycle to avoid overwrapping. + # cmd_trjconv = f"echo {args_to_trjconv} \ + # | {settings.cmd_gmx} trjconv \ + # -f {from_dir}/prd{extension} \ + # -o {out_dir}/prd_image_prev_cycle{extension} \ + # -s {from_dir}/prd.tpr \ + # -n {settings.index_file} \ + # {pbc_option} \ + # {centering_option} \ + # 1> {from_dir}/trjconv.log 2>&1" # NOQA: E221 + + # res_trjconv = subprocess.run(cmd_trjconv, shell=True) + + # if res_trjconv.returncode != 0: + # LOGGER.error("error occurred at trjconv command") + # LOGGER.error(f"see {from_dir}/trjconv.log") + # exit(1) + + # cmd_extract = f"echo System \ + # | {settings.cmd_gmx} trjconv \ + # -f {out_dir}/prd_image_prev_cycle{extension} \ + # -o {out_dir}/input{settings.structure_extension} \ + # -s {from_dir}/prd.tpr \ + # -b {results[replica_rank].frame} \ + # -e {results[replica_rank].frame} \ + # -novel \ + # 1> {from_dir}/extract.log 2>&1" # NOQA: E221 + + # res_extract = subprocess.run(cmd_extract, shell=True) + + # if res_extract.returncode != 0: + # LOGGER.error("error occurred at extract command") + # LOGGER.error(f"see {from_dir}/extract.log") + # exit(1) + + # # remove the intermediate trajectory + # res_rm = subprocess.run( + # f"rm {out_dir}/prd_image_prev_cycle{extension}", shell=True + # ) + # if res_rm.returncode != 0: + # LOGGER.error("error occurred at rm command") + # exit(1) + + # def frame_to_time(self, settings: MDsettings) -> None: + # # Output correspondence between frame and time to file + # dir_0_1 = settings.each_replica(_cycle=0, _replica=1) + # if Path(f"{dir_0_1}/frame_time.tsv").exists(): + # return + # cmd_pseudo_rms = f"echo 0 0 \ + # | {settings.cmd_gmx} rms \ + # -f {dir_0_1}/prd{settings.trajectory_extension} \ + # -s {dir_0_1}/prd.gro \ + # -o {dir_0_1}/pseudo_rms.xvg \ + # -nomw \ + # -xvg none 1> {dir_0_1}/pseudo_rms.log 2>&1" # NOQA: E221 + # res_pseudo_rms = subprocess.run(cmd_pseudo_rms, shell=True) + # if res_pseudo_rms.returncode != 0: + # LOGGER.error("error occurred at pseudo rms command") + # LOGGER.error(f"see {dir_0_1}/pseudo_rms.log") + # exit(1) + # time_data = np.genfromtxt(f"{dir_0_1}/pseudo_rms.xvg", usecols=0) + # frame_data = np.arange(len(time_data)) + # frame_time_data = np.array([frame_data, time_data]).reshape(2, -1).T + # np.savetxt( + # f"{dir_0_1}/frame_time.tsv", + # frame_time_data, + # fmt=("%d", "%.3f"), + # header="frame time(ps)", + # delimiter="\t", + # ) + + # def load_frame_to_time(self, settings: MDsettings) -> Dict[int, float]: + # # Read correspondence between frame and time from file + # dir_0_1 = settings.each_replica(_cycle=0, _replica=1) + # frame_time_array = np.loadtxt( + # f"{dir_0_1}/frame_time.tsv", delimiter="\t", skiprows=1 + # ) + # frame_time_dict: Dict[int, float] = {} + # for frame, time in frame_time_array: + # frame_time_dict[int(frame)] = time + # return frame_time_dict diff --git a/pacs/mdrun/exporter/superExporter.py b/pacs/mdrun/exporter/superExporter.py index 4cab4d0..3c61148 100644 --- a/pacs/mdrun/exporter/superExporter.py +++ b/pacs/mdrun/exporter/superExporter.py @@ -23,10 +23,14 @@ def export_each( pass def export(self, settings: MDsettings, cycle: int) -> None: + self.export_one_way(settings, cycle, "fore") + self.export_one_way(settings, cycle, "back") + + def export_one_way(self, settings: MDsettings, cycle: int, direction: str) -> None: pattern1 = r"replica (\d+) frame (\d+) cv \[([-\d.\s]+)\]" pattern2 = r"replica (\d+) frame (\d+) cv ([-\d.]+)" results = [] - dir = settings.each_cycle(_cycle=cycle) + dir = settings.each_direction(_cycle=cycle, _direction=direction) with open(f"{dir}/summary/cv_ranked.log", "r") as f: for line in f: match1 = re.search(pattern1, line) diff --git a/pacs/mdrun/simulator/superSimulator.py b/pacs/mdrun/simulator/superSimulator.py index 4dd8772..8c4088b 100644 --- a/pacs/mdrun/simulator/superSimulator.py +++ b/pacs/mdrun/simulator/superSimulator.py @@ -28,7 +28,7 @@ def run_parallel(self, settings: MDsettings, cycle: int) -> None: def run_parallel_one_way(self, settings: MDsettings, cycle: int, direction: str) -> None: if settings.n_parallel == 1 or cycle == 0: - self.run_serial(settings, cycle, direction) + self.run_serial(settings, cycle) return if settings.cmd_mpi != "": @@ -126,7 +126,7 @@ def run_parallel_MPI_one_way(self, settings: MDsettings, cycle: int, direction: ]: groupreplica.append(replica) if len(groupreplica) != n_parallel: - self.run_serial(settings, cycle, direction) + self.run_serial(settings, cycle) else: self.run_MPI(settings, cycle, direction, groupreplica) for replica in groupreplica: diff --git a/pacs/models/settings.py b/pacs/models/settings.py index b52492c..a54102e 100644 --- a/pacs/models/settings.py +++ b/pacs/models/settings.py @@ -2,6 +2,7 @@ import re from pathlib import Path +import numpy as np from pacs.utils.logger import generate_logger LOGGER = generate_logger(__name__) @@ -80,7 +81,8 @@ class MDsettings: # hidden option cmd_gmx: str = None - top_mdtraj: str = None + top_mdtraj_fore: str = None + top_mdtraj_back: str = None structure_extension: str = None # genrepresent option @@ -104,8 +106,6 @@ def each_replica( _trial = self.trial if _cycle is None: _cycle = self.max_cycle - if _direction is None: - _direction = self.direction if _replica is None: _replica = self.n_replica if _direction not in ["fore", "back"]: @@ -120,8 +120,6 @@ def each_direction(self, _trial: int = None, _cycle: int = None, _direction: str _trial = self.trial if _cycle is None: _cycle = self.cycle - if _direction is None: - _direction = self.direction if _direction not in ["fore", "back"]: LOGGER.error(f"direction={_direction} is given.") exit(1) @@ -142,9 +140,9 @@ def each_trial(self, _trial: int = None) -> str: def log_file(self) -> str: return f"{self.working_dir}/trial{self.trial:03}.log" - def rmmol_top(self, direction: str) -> None: - c0r1_dir = self.each_replica(_cycle=0, _direction=direction, _replica=1) - self.top_mdtraj = f"{c0r1_dir}/rmmol_top.pdb" + # def rmmol_top(self, direction: str) -> None: + # c0r1_dir = self.each_replica(_cycle=0, _direction=direction, _replica=1) + # self.top_mdtraj = f"{c0r1_dir}/rmmol_top.pdb" def check_bool(self, value: str) -> bool: if isinstance(value, str): @@ -277,7 +275,7 @@ def check(self) -> None: ) # Check if indexfile is set when gromacs - if self.simulator == "gromacs" and self.index_file is None: + if self.simulator == "gromacs" and (self.index_file_fore is None or self.index_file_back is None): LOGGER.error("index file is required for gromacs") exit(1) @@ -387,16 +385,23 @@ def check(self) -> None: ".gsd", ] # Retrieve the topology file from the input - top_mdtraj = next( - ( - v - for v in vars(self).values() - if hasattr(v, "as_posix") and v.suffix in top_extension - ), - None, - ) - if top_mdtraj is not None: - self.top_mdtraj = top_mdtraj + # top_mdtraj = next( + # ( + # v + # for v in vars(self).values() + # if hasattr(v, "as_posix") and v.suffix in top_extension + # ), + # None, + # ) + top_mdtraj_fore = Path(self.structure_fore) + top_mdtraj_back = Path(self.structure_back) + if top_mdtraj_fore is not None: + self.top_mdtraj_fore = top_mdtraj_fore + else: + LOGGER.error("a topology file required to read the trajectory is missing.") + exit(1) + if top_mdtraj_back is not None: + self.top_mdtraj_back = top_mdtraj_back else: LOGGER.error("a topology file required to read the trajectory is missing.") exit(1) @@ -465,3 +470,46 @@ def __eq__(self, other) -> bool: def __lt__(self, other) -> bool: return self.cv < other.cv + + +class ScoresInOnePair: + """ + Information of CV in a pair of (fore_replica, back_replica) + """ + def __init__(self, cycle: int, fore_replica: int, back_replica: int, cv_data: np.ndarray) -> None: + self.cycle: int = cycle + self.fore_replica: int = fore_replica + self.back_replica: int = back_replica + self.cv_data: np.ndarray = cv_data[:] + self.n_frames_fore: int = self.cv_data.shape[0] + self.n_frames_back: int = self.cv_data.shape[1] + + +class ScoresInCycle: + """ + Information of evaluation scores in a cycle + + """ + def __init__(self, cycle: int, n_replicas: int, n_frames_fore: int, n_frames_back: int) -> None: + self.cycle: int = cycle + self.n_replicas: int = n_replicas + self.n_frames_fore: int = n_frames_fore + self.n_frames_back: int = n_frames_back + self.cv_data: np.ndarray = np.zeros((n_replicas, n_replicas, n_frames_fore, n_frames_back)) + + def add(self, scores_in_one_pair: ScoresInOnePair) -> None: + # check shape + fore_replica = scores_in_one_pair.fore_replica + back_replica = scores_in_one_pair.back_replica + n_frames_fore = scores_in_one_pair.n_frames_fore + n_frames_back = scores_in_one_pair.n_frames_back + if n_frames_fore != self.n_frames_fore: + raise ValueError(f"n_frames_fore is different: {n_frames_fore} != {self.n_frames_fore}") + if n_frames_back != self.n_frames_back: + raise ValueError(f"n_frames_back is different: {n_frames_back} != {self.n_frames_back}") + + # add data + self.cv_data[scores_in_one_pair.fore_replica-1, scores_in_one_pair.back_replica-1, :, :] = scores_in_one_pair.cv_data + + def save(self, path: str) -> None: + np.save(path, self.cv_data) \ No newline at end of file