diff --git a/docs/source/api.rst b/docs/source/api.rst index 07f5303..1c53c80 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -13,4 +13,5 @@ API Reference api/tools api/mcmc api/scaler + api/parallel diff --git a/docs/source/api/parallel.rst b/docs/source/api/parallel.rst new file mode 100644 index 0000000..8e422ec --- /dev/null +++ b/docs/source/api/parallel.rst @@ -0,0 +1,5 @@ +Parallel +======== + +.. autoclass:: pocomc.parallel.MPIPool + :members: \ No newline at end of file diff --git a/docs/source/checkpoint.ipynb b/docs/source/checkpoint.ipynb index c77aae1..e406c8e 100644 --- a/docs/source/checkpoint.ipynb +++ b/docs/source/checkpoint.ipynb @@ -75,6 +75,38 @@ "source": [ "sampler.run(resume_state_path = \"states/pmc_3.state\")" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load and Add More Samples\n", + "\n", + "It is possible to add more samples to a finished run. This is useful when one wants to experiment with *small* runs until they get their analysis right, and then increase the number of required posterior samples to get publication-quality results. When ``save_every`` is not ``None``, pocoMC will save a *final* file when sampling is done. By default, this is called ``pmc_final.state``. We can load this state and change the termination criteria in order to add more samples, as follows:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sampler.run(n_total=16384, # This is the number of samples we want to draw in total, including the ones we already have.\n", + " n_evidence=16384, # This is the number of samples we want to draw for the evidence estimation.\n", + " resume_state_path = \"states/pmc_final.state\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this case, we chose to terminate sampling when the total ESS exceeds ``n_total=16384``, which is higher than the default value of ``n_total=4096``. Furthermore, we also provided a higher number of samples used for the evidence estimation. This means that the new evidence estimate will be more accurate than the original. However, could have chose to set ``n_evidence=0`` and only added more posterior samples." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] } ], "metadata": { diff --git a/docs/source/index.rst b/docs/source/index.rst index 625ab72..a1b04d1 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,8 +1,3 @@ -.. pocoMC documentation master file, created by - sphinx-quickstart on Fri Apr 29 13:25:54 2022. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. - | .. title:: pocoMC documentation @@ -119,6 +114,13 @@ Copyright 2022-2024 Minas Karamanis and contributors. Changelog ========= +**1.2.0 (11/06/24)** + +- Added ``MPIPool`` for parallelization. +- Fixed bugs in checkpointing when using MPI in NFS4 and BeeGFS filesystems. +- Automatically save final checkpoint file when finishing the run if ``save_every`` is not ``None``. +- Added option to continue sampling after completing the run. + **1.1.0 (31/05/24)** - Fix robustness issues with the Crank-Nicolson sampler. diff --git a/docs/source/install.rst b/docs/source/install.rst index fde55ef..e6f0b69 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -7,7 +7,7 @@ Dependencies ------------ **pocoMC** depends on ``numpy``, ``torch``, ``zuko``, ``tqdm``, ``scipy``, ``dill``, and ``multiprocess``. - +Optionally, you can install ``mpi4py`` for parallelization using the provided ``MPIPool``. Using pip --------- diff --git a/docs/source/parallelization.ipynb b/docs/source/parallelization.ipynb index ee64f3c..1f6ae9e 100644 --- a/docs/source/parallelization.ipynb +++ b/docs/source/parallelization.ipynb @@ -95,7 +95,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "When running on a High-Performance Computing (HPC) cluster with multiple nodes of many CPUs each, it may be beneficial to use Message Passing Interface (MPI) parallelization. A simple way to achieve this is using the ``mpi4py.futures`` package as follows:" + "When running on a High-Performance Computing (HPC) cluster with multiple nodes of many CPUs each, it may be beneficial to use Message Passing Interface (MPI) parallelization. A simple way to achieve this is using the provided ``MPIPool`` as follows. Please note that you will need to have ``mpi4py`` installed to use this option." ] }, { @@ -104,12 +104,10 @@ "metadata": {}, "outputs": [], "source": [ - "from mpi4py.futures import MPIPoolExecutor\n", - "\n", "import pocomc as pc\n", "\n", "if __name__ == '__main__':\n", - " with MPIPoolExecutor(256) as pool:\n", + " with pc.parallel.MPIPool() as pool:\n", " sampler = pc.Sampler(prior, log_likelihood, pool=pool)\n", " sampler.run()" ] @@ -118,7 +116,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The above script should be executed via ``mpiexec -n 256 python -m mpi4py.futures script.py`` where 256 is the number of processes." + "The above script should be executed via ``mpiexec -n 256 python script.py`` where 256 is the number of processes." ] }, { diff --git a/docs/source/sampling.ipynb b/docs/source/sampling.ipynb index 5692d4a..95b7978 100644 --- a/docs/source/sampling.ipynb +++ b/docs/source/sampling.ipynb @@ -141,7 +141,22 @@ { "cell_type": "markdown", "metadata": {}, - "source": [] + "source": [ + "## Continue Running after Completion\n", + "It is possible to continue running the sampler after sampling has been completed in order to add more samples. This can be useful if the user requires more samples to be able to approximate posterior or estimate the evidence more accurately. This can be achieved easily by calling the ``run()`` method again with higher ``n_total`` and/or ``n_evidence`` values. For instance:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sampler.run(\n", + " n_total=16384,\n", + " n_evidence=16384,\n", + ")" + ] } ], "metadata": { diff --git a/pocomc/__init__.py b/pocomc/__init__.py index 054f2b1..76b2352 100644 --- a/pocomc/__init__.py +++ b/pocomc/__init__.py @@ -27,6 +27,7 @@ from .flow import * from .sampler import * from .prior import * +from .parallel import * from ._version import version __version__ = version diff --git a/pocomc/_version.py b/pocomc/_version.py index 40b743a..1350d4e 100644 --- a/pocomc/_version.py +++ b/pocomc/_version.py @@ -1 +1 @@ -version = "1.1.0" \ No newline at end of file +version = "1.2.0" \ No newline at end of file diff --git a/pocomc/parallel.py b/pocomc/parallel.py new file mode 100644 index 0000000..19a0511 --- /dev/null +++ b/pocomc/parallel.py @@ -0,0 +1,178 @@ +import sys +import atexit + +MPI = None + +def _import_mpi(use_dill=False): + global MPI + try: + from mpi4py import MPI as _MPI + if use_dill: + import dill + _MPI.pickle.__init__(dill.dumps, dill.loads, dill.HIGHEST_PROTOCOL) + MPI = _MPI + except: + raise ImportError("Please install mpi4py") + + return MPI + + +class MPIPool: + r"""A processing pool that distributes tasks using MPI. + With this pool class, the master process distributes tasks to worker + processes using an MPI communicator. + + + Parameters + ---------- + comm : :class:`mpi4py.MPI.Comm`, optional + An MPI communicator to distribute tasks with. If ``None``, this uses + ``MPI.COMM_WORLD`` by default. + use_dill : bool, optional + If ``True``, use dill for pickling objects. This is useful for + pickling functions and objects that are not picklable by the default + pickle module. Default is ``True``. + + Notes + ----- + This implementation is inspired by @juliohm in `this module + `_ + and was adapted from schwimmbad. + """ + + def __init__(self, comm=None, use_dill=True): + + global MPI + if MPI is None: + MPI = _import_mpi(use_dill=use_dill) + + self.comm = MPI.COMM_WORLD if comm is None else comm + + self.master = 0 + self.rank = self.comm.Get_rank() + + atexit.register(lambda: MPIPool.close(self)) + + if not self.is_master(): + # workers branch here and wait for work + self.wait() + sys.exit(0) + + self.workers = set(range(self.comm.size)) + self.workers.discard(self.master) + self.size = self.comm.Get_size() - 1 + + if self.size == 0: + raise ValueError("Tried to create an MPI pool, but there " + "was only one MPI process available. " + "Need at least two.") + + + def wait(self): + r"""Tell the workers to wait and listen for the master process. This is + called automatically when using :meth:`MPIPool.map` and doesn't need to + be called by the user. + """ + if self.is_master(): + return + + status = MPI.Status() + while True: + task = self.comm.recv(source=self.master, tag=MPI.ANY_TAG, status=status) + + if task is None: + # Worker told to quit work + break + + func, arg = task + result = func(arg) + # Worker is sending answer with tag + self.comm.ssend(result, self.master, status.tag) + + + def map(self, worker, tasks): + r"""Evaluate a function or callable on each task in parallel using MPI. + The callable, ``worker``, is called on each element of the ``tasks`` + iterable. The results are returned in the expected order. + + Parameters + ---------- + worker : callable + A function or callable object that is executed on each element of + the specified ``tasks`` iterable. This object must be picklable + (i.e. it can't be a function scoped within a function or a + ``lambda`` function). This should accept a single positional + argument and return a single object. + tasks : iterable + A list or iterable of tasks. Each task can be itself an iterable + (e.g., tuple) of values or data to pass in to the worker function. + + Returns + ------- + results : list + A list of results from the output of each ``worker()`` call. + """ + + # If not the master just wait for instructions. + if not self.is_master(): + self.wait() + return + + + workerset = self.workers.copy() + tasklist = [(tid, (worker, arg)) for tid, arg in enumerate(tasks)] + resultlist = [None] * len(tasklist) + pending = len(tasklist) + + while pending: + if workerset and tasklist: + worker = workerset.pop() + taskid, task = tasklist.pop() + # "Sent task %s to worker %s with tag %s" + self.comm.send(task, dest=worker, tag=taskid) + + if tasklist: + flag = self.comm.Iprobe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG) + if not flag: + continue + else: + self.comm.Probe(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG) + + status = MPI.Status() + result = self.comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, + status=status) + worker = status.source + taskid = status.tag + + # "Master received from worker %s with tag %s" + + workerset.add(worker) + resultlist[taskid] = result + pending -= 1 + + return resultlist + + + def close(self): + """ Tell all the workers to quit.""" + if self.is_worker(): + return + + for worker in self.workers: + self.comm.send(None, worker, 0) + + + def is_master(self): + return self.rank == 0 + + + def is_worker(self): + return self.rank != 0 + + + def __enter__(self): + return self + + + def __exit__(self, *args): + self.close() \ No newline at end of file diff --git a/pocomc/sampler.py b/pocomc/sampler.py index f3d1efe..eb3f2eb 100644 --- a/pocomc/sampler.py +++ b/pocomc/sampler.py @@ -1,6 +1,7 @@ from pathlib import Path from typing import Union +import os import dill import numpy as np from multiprocess import Pool @@ -239,6 +240,9 @@ def __init__(self, # Total ESS for termination self.n_total = None + # Number of samples for evidence estimation + self.n_evidence = None + # Particle manager self.particles = Particles(n_active, n_dim) @@ -382,18 +386,33 @@ def run(self, self.load_state(resume_state_path) t0 = self.t # Initialise progress bar - self.pbar = ProgressBar(self.progress) - self.pbar.update_stats(dict(calls=self.particles.get("calls", -1), - beta=self.particles.get("beta", -1), - logZ=self.particles.get("logz", -1))) + self.pbar = ProgressBar(self.progress, initial=t0) + self.pbar.update_stats(dict(beta=self.particles.get("beta", -1), + calls=self.particles.get("calls", -1), + ESS=self.particles.get("ess", -1), + logZ=self.particles.get("logz", -1), + logP=np.mean(self.particles.get("logp", -1)+self.particles.get("logl", -1)), + acc=self.particles.get("accept", -1), + steps=self.particles.get("steps", -1), + eff=self.particles.get("efficiency", -1))) else: t0 = self.t # Run parameters - self.n_total = int(n_total) self.progress = progress # Initialise progress bar self.pbar = ProgressBar(self.progress) + self.pbar.update_stats(dict(beta=0.0, + calls=self.calls, + ESS=self.n_effective, + logZ=0.0, + logP=0.0, + acc=0.0, + steps=0, + eff=0.0)) + + self.n_total = int(n_total) + self.n_evidence = int(n_evidence) # Initialise particles if self.prior_samples is None: @@ -422,11 +441,11 @@ def run(self, self.pbar.update_stats(dict(calls=self.particles.get("calls", -1), beta=self.particles.get("beta", -1), + ESS=int(self.particles.get("ess", -1)), logZ=self.particles.get("logz", -1), - ESS=self.particles.get("ess", -1), + logP=np.mean(self.particles.get("logp", -1)+self.particles.get("logl", -1)), acc=self.particles.get("accept", -1), steps=self.particles.get("steps", -1), - logP=np.mean(self.particles.get("logp", -1)+self.particles.get("logl", -1)), eff=self.particles.get("efficiency", -1))) self.pbar.update_iter() @@ -456,12 +475,17 @@ def run(self, self.particles.update(self.current_particles) # Compute evidence - if n_evidence > 0 and self.preconditioned: - self._compute_evidence(int(n_evidence)) + if self.n_evidence > 0 and self.preconditioned: + self._compute_evidence(self.n_evidence) else: _, self.logz = self.particles.compute_logw_and_logz(1.0) self.logz_err = None - + + # Save final state + if save_every is not None: + self.save_state(Path(self.output_dir) / f'{self.output_label}_final.state') + + # Close progress bar self.pbar.close() def _not_termination(self, current_particles): @@ -568,6 +592,7 @@ def _mutate(self, current_particles): current_particles["steps"] = results.get('steps') current_particles["accept"] = results.get('accept') current_particles["calls"] = current_particles.get("calls") + results.get('calls') + self.calls = current_particles.get("calls") self.proposal_scale = results.get('proposal_scale') return current_particles @@ -694,13 +719,13 @@ def get_weights_and_ess(beta): weights = weights_prev logz = self.particles.get("logz", index=-1) ess_est = ess_est_prev - self.pbar.update_stats(dict(beta=beta, ESS=ess_est_prev, logZ=logz)) + self.pbar.update_stats(dict(beta=beta, ESS=int(ess_est_prev), logZ=logz)) elif ess_est_max >= self.n_effective: beta = beta_max weights = weights_max _, logz = self.particles.compute_logw_and_logz(beta) ess_est = ess_est_max - self.pbar.update_stats(dict(beta=beta, ESS=ess_est_max, logZ=logz)) + self.pbar.update_stats(dict(beta=beta, ESS=int(ess_est_max), logZ=logz)) else: while True: beta = (beta_max + beta_min) * 0.5 @@ -709,7 +734,7 @@ def get_weights_and_ess(beta): if np.abs(ess_est - self.n_effective) < 0.01 * self.n_effective or beta == 1.0: _, logz = self.particles.compute_logw_and_logz(beta) - self.pbar.update_stats(dict(beta=beta, ESS=ess_est, logZ=logz)) + self.pbar.update_stats(dict(beta=beta, ESS=int(ess_est), logZ=logz)) break elif ess_est < self.n_effective: beta_max = beta @@ -836,6 +861,9 @@ def _compute_evidence(self, n=5_000): dlogz = np.std([np.logaddexp.reduce(logw[np.random.choice(len(logw), len(logw))]) - np.log(len(logw)) for _ in range(np.maximum(n,1000))]) + self.calls += n + self.pbar.update_stats(dict(calls=self.calls)) + self.logz = logz self.logz_err = dlogz return logz, dlogz @@ -949,7 +977,8 @@ def save_state(self, path: Union[str, Path]): """ print(f'Saving PMC state to {path}') Path(path).parent.mkdir(exist_ok=True) - with open(path, 'wb') as f: + temp_path = Path(path).with_suffix('.temp') + with open(temp_path, 'wb') as f: state = self.__dict__.copy() del state['pbar'] # Cannot be pickled try: @@ -959,7 +988,12 @@ def save_state(self, path: Union[str, Path]): del state['distribute'] # remove `pool.map` function hook except BaseException as e: print(e) + dill.dump(file=f, obj=state) + f.flush() + os.fsync(f.fileno()) + + os.rename(temp_path, path) def load_state(self, path: Union[str, Path]): """Load state of sampler from file. diff --git a/pocomc/tools.py b/pocomc/tools.py index a3fa61d..1d0c872 100644 --- a/pocomc/tools.py +++ b/pocomc/tools.py @@ -195,8 +195,8 @@ class ProgressBar: show : `bool` Whether or not to show a progress bar. Default is ``True``. """ - def __init__(self, show: bool = True): - self.progress_bar = tqdm(desc='Iter', disable=not show) + def __init__(self, show: bool = True, initial=0): + self.progress_bar = tqdm(desc='Iter', disable=not show, initial=initial) self.info = dict() def update_stats(self, info):