Skip to content

Commit

Permalink
feat: Add support for peak list file in plot_spectra function
Browse files Browse the repository at this point in the history
  • Loading branch information
gbouvignies committed Jul 11, 2024
1 parent d4563e7 commit c8967bc
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 308 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.analysis.typeCheckingMode": "basic"
}
28 changes: 14 additions & 14 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 45 additions & 27 deletions peakfit/cli.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,60 @@
"""The parsing module contains the code for the parsing of command-line arguments."""

from __future__ import annotations

from argparse import ArgumentParser
from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class Arguments:
"""The dataclass for the command-line arguments."""

path_spectra: list[Path] = field(default_factory=list)
path_list: Path = field(default_factory=Path)
path_z_values: list[Path] = field(default_factory=list)
contour_level: float | None = None
noise: float | None = None
path_output: Path = field(default_factory=Path)
refine_nb: int = 1
fixed: bool = False
pvoigt: bool = False
lorentzian: bool = False
gaussian: bool = False
jx: bool = False
phx: bool = False
phy: bool = False
exclude: list[int] = field(default_factory=list)


def build_parser() -> ArgumentParser:
"""Parse the command-line arguments."""
description = "Perform peak integration in pseudo-3D spectra."

parser = ArgumentParser(description=description)

parser.add_argument(
"--spectra",
"-s",
dest="path_spectra",
required=True,
type=Path,
nargs="+",
)
parser.add_argument("--list", "-l", dest="path_list", required=True, type=Path)
parser.add_argument(
"--zvalues",
"-z",
dest="path_z_values",
required=True,
nargs="+",
)
parser.add_argument("--ct", "-t", dest="contour_level", type=float)
parser.add_argument("-s", dest="path_spectra", type=Path, required=True, nargs="+")
parser.add_argument("-l", dest="path_list", type=Path, required=True)
parser.add_argument("-z", dest="path_z_values", type=Path, required=True, nargs="+")
parser.add_argument("-t", dest="contour_level", type=float)
parser.add_argument("-n", dest="noise", type=float)
parser.add_argument("-o", dest="path_output", type=Path, default="Fits")
parser.add_argument("--refine", "-r", dest="refine_nb", type=int, default=1)
parser.add_argument("--out", "-o", dest="path_output", default="Fits", type=Path)
parser.add_argument("--noise", "-n", dest="noise", type=float)
parser.add_argument("--fixed", dest="fixed", action="store_true")
parser.add_argument("--pvoigt", dest="pvoigt", action="store_true")
parser.add_argument("--lorentzian", dest="lorentzian", action="store_true")
parser.add_argument("--gaussian", dest="gaussian", action="store_true")
parser.add_argument("--jx", dest="jx", action="store_true")
parser.add_argument("--exclude", dest="exclude", type=int, nargs="+")
parser.add_argument("--fixed", action="store_true")
parser.add_argument("--pvoigt", action="store_true")
parser.add_argument("--lorentzian", action="store_true")
parser.add_argument("--gaussian", action="store_true")
parser.add_argument("--jx", action="store_true")
parser.add_argument("--phx", action="store_true")
parser.add_argument("--phy", action="store_true")
parser.add_argument("--exclude", type=int, nargs="+", default=[])

return parser


def parse_args() -> Arguments:
"""Parse the command-line arguments."""
args = Arguments()
parser = build_parser()
parser.parse_args(namespace=args)

return args
18 changes: 0 additions & 18 deletions peakfit/computing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,6 @@ def calculate_amplitudes_err(
return amplitudes, amplitudes_err.T


# def calculate_amplitudes_err(
# shapes: FloatArray, data: FloatArray
# ) -> tuple[FloatArray, FloatArray]:
# amplitudes, chi2, _rank, _s = np.linalg.lstsq(shapes.T, data, rcond=None)
# cov = np.linalg.pinv(shapes @ shapes.T)

# residuals = data - shapes.T @ amplitudes
# leverage = np.diagonal(shapes.T @ cov @ shapes)
# # # HC3, Mackinnon and White estimator
# res_scaled = residuals.T / (1 - leverage)
# cov_robust = cov @ shapes @ (shapes.T * res_scaled[:, :, np.newaxis] ** 2) @ cov
# amplitudes_err = np.sqrt(np.diagonal(cov_robust, axis1=1, axis2=2))

# amplitudes_err = np.full_like(amplitudes_err, np.mean(amplitudes_err))

# return amplitudes, amplitudes_err.T


def calculate_shape_heights(
params: lf.Parameters, cluster: Cluster
) -> tuple[FloatArray, FloatArray]:
Expand Down
5 changes: 2 additions & 3 deletions peakfit/noise.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from argparse import Namespace

import numpy as np
from lmfit.models import GaussianModel
from numpy.typing import NDArray

from peakfit.cli import Arguments
from peakfit.messages import print_estimated_noise
from peakfit.spectra import Spectra

FloatArray = NDArray[np.float64]
IntArray = NDArray[np.int_]


def prepare_noise_level(clargs: Namespace, spectra: Spectra) -> float:
def prepare_noise_level(clargs: Arguments, spectra: Spectra) -> float:
"""Prepare the noise level for fitting."""
if clargs.noise is not None and clargs.noise < 0.0:
clargs.noise = None
Expand Down
8 changes: 5 additions & 3 deletions peakfit/peak.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
from numpy.typing import NDArray

from peakfit.cli import Arguments
from peakfit.shapes import SHAPES, Shape
from peakfit.spectra import Spectra

Expand All @@ -28,11 +29,11 @@ def create_params(self) -> lf.Parameters:
params.update(shape.create_params())
return params

def fix_params(self, params) -> None:
def fix_params(self, params: lf.Parameters) -> None:
for shape in self.shapes:
shape.fix_params(params)

def release_params(self, params) -> None:
def release_params(self, params: lf.Parameters) -> None:
for shape in self.shapes:
shape.release_params(params)

Expand Down Expand Up @@ -72,9 +73,10 @@ def create_peak(
positions: Sequence[float],
shape_names: list[str],
spectra: Spectra,
args: Arguments,
) -> Peak:
shapes = [
SHAPES[shape_name](name, center, spectra, dim)
SHAPES[shape_name](name, center, spectra, dim, args)
for dim, (center, shape_name) in enumerate(
zip(positions, shape_names, strict=False), start=1
)
Expand Down
17 changes: 8 additions & 9 deletions peakfit/peakfit.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Main module for peak fitting."""

from argparse import Namespace
from collections.abc import Sequence
from pathlib import Path

Expand All @@ -9,7 +8,7 @@
import numpy as np
import numpy.typing as npt

from peakfit.cli import build_parser
from peakfit.cli import Arguments, parse_args
from peakfit.clustering import Cluster, create_clusters
from peakfit.computing import (
residuals,
Expand Down Expand Up @@ -42,7 +41,7 @@ def update_params(params: lf.Parameters, params_all: lf.Parameters) -> lf.Parame
return params


def fit_clusters(clargs: Namespace, clusters: Sequence[Cluster]) -> lf.Parameters:
def fit_clusters(clargs: Arguments, clusters: Sequence[Cluster]) -> lf.Parameters:
"""Fit all clusters and return shifts."""
print_fitting()
params_all = lf.Parameters()
Expand All @@ -54,7 +53,8 @@ def fit_clusters(clargs: Namespace, clusters: Sequence[Cluster]) -> lf.Parameter
print_peaks(cluster.peaks)
params = create_params(cluster.peaks, fixed=clargs.fixed)
params = update_params(params, params_all)
out = lf.minimize(residuals, params, args=(cluster, clargs.noise))
mini = lf.Minimizer(residuals, params, fcn_args=(cluster, clargs.noise))
out = mini.least_squares(verbose=2)
print_fit_report(out)
params_all.update(getattr(out, "params", lf.Parameters()))

Expand Down Expand Up @@ -83,23 +83,22 @@ def main() -> None:
"""Run peakfit."""
print_logo()

parser = build_parser()
clargs = parser.parse_args()
clargs = parse_args()

spectra = read_spectra(clargs.path_spectra, clargs.path_z_values, clargs.exclude)

clargs.noise = prepare_noise_level(clargs, spectra)

shape_names = get_shape_names(clargs, spectra)
peaks = read_list(clargs.path_list, spectra, shape_names)
peaks = read_list(spectra, shape_names, clargs)

clargs.contour_level = clargs.contour_level or 10.0 * clargs.noise
clargs.contour_level = clargs.contour_level or 5.0 * clargs.noise
clusters = create_clusters(spectra, peaks, clargs.contour_level)

clargs.path_output.mkdir(parents=True, exist_ok=True)
params = fit_clusters(clargs, clusters)

write_profiles(clargs.path_output, spectra.z_values, clusters, params)
write_profiles(clargs.path_output, spectra.z_values, clusters, params, clargs)

export_html(clargs.path_output / "logs.html")

Expand Down
37 changes: 23 additions & 14 deletions peakfit/peaklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import numpy as np
import pandas as pd

from peakfit.cli import Arguments
from peakfit.peak import Peak, create_peak
from peakfit.spectra import Spectra

Reader = Callable[[Path, Spectra, list[str]], list[Peak]]
Reader = Callable[[Path, Spectra, list[str], Arguments], list[Peak]]

READERS: dict[str, Reader] = {}

Expand All @@ -29,18 +30,18 @@ def decorator(fn: Reader) -> Reader:


def _create_peak_list(
peaks: pd.DataFrame, spectra: Spectra, shape_names: list[str]
peaks: pd.DataFrame, spectra: Spectra, shape_names: list[str], args: Arguments
) -> list[Peak]:
"""Create a list of Peak objects from a DataFrame."""
return [
create_peak(name, positions, shape_names, spectra)
create_peak(name, positions, shape_names, spectra, args)
for name, *positions in peaks.itertuples(index=False, name=None)
]


@register_reader("list")
def read_sparky_list(
path: Path, spectra: Spectra, shape_names: list[str]
path: Path, spectra: Spectra, shape_names: list[str], args: Arguments
) -> list[Peak]:
"""Read a Sparky list file and return a list of peaks."""
with path.open() as f:
Expand All @@ -53,7 +54,7 @@ def read_sparky_list(
encoding="utf-8",
names=("name", "y0_ppm", "x0_ppm"),
)
return _create_peak_list(peaks, spectra, shape_names)
return _create_peak_list(peaks, spectra, shape_names, args)


@np.vectorize
Expand All @@ -74,36 +75,44 @@ def _read_ccpn_list(
spectra: Spectra,
read_func: Callable[[Path], pd.DataFrame],
shape_names: list[str],
args: Arguments,
) -> list[Peak]:
"""Read a generic list file and return a list of peaks."""
peaks_csv = read_func(path)
names = _make_names(peaks_csv["Assign F2"], peaks_csv["Assign F1"], peaks_csv["#"])
peaks = pd.DataFrame(
{"name": names, "y0_ppm": peaks_csv["Pos F2"], "x0_ppm": peaks_csv["Pos F1"]}
)
return _create_peak_list(peaks, spectra, shape_names)
return _create_peak_list(peaks, spectra, shape_names, args)


@register_reader("csv")
def read_csv_list(path: Path, spectra: Spectra, shape_names: list[str]) -> list[Peak]:
return _read_ccpn_list(path, spectra, pd.read_csv, shape_names)
def read_csv_list(
path: Path, spectra: Spectra, shape_names: list[str], args: Arguments
) -> list[Peak]:
return _read_ccpn_list(path, spectra, pd.read_csv, shape_names, args)


@register_reader("json")
def read_json_list(path: Path, spectra: Spectra, shape_names: list[str]) -> list[Peak]:
return _read_ccpn_list(path, spectra, pd.read_json, shape_names)
def read_json_list(
path: Path, spectra: Spectra, shape_names: list[str], args: Arguments
) -> list[Peak]:
return _read_ccpn_list(path, spectra, pd.read_json, shape_names, args)


@register_reader(["xlsx", "xls"])
def read_excel_list(path: Path, spectra: Spectra, shape_names: list[str]) -> list[Peak]:
return _read_ccpn_list(path, spectra, pd.read_excel, shape_names)
def read_excel_list(
path: Path, spectra: Spectra, shape_names: list[str], args: Arguments
) -> list[Peak]:
return _read_ccpn_list(path, spectra, pd.read_excel, shape_names, args)


def read_list(path: Path, spectra: Spectra, shape_names: list[str]) -> list[Peak]:
def read_list(spectra: Spectra, shape_names: list[str], args: Arguments) -> list[Peak]:
"""Read a list of peaks from a file based on its extension."""
path = args.path_list
extension = path.suffix.lstrip(".")
reader = READERS.get(extension)
if reader is None:
msg = f"No reader registered for extension: {extension}"
raise ValueError(msg)
return reader(path, spectra, shape_names)
return reader(path, spectra, shape_names, args)
3 changes: 3 additions & 0 deletions peakfit/plotting/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def main() -> None:
spectra_parser.add_argument(
"--sim", dest="data_sim", required=True, help="Path to the second spectrum file"
)
spectra_parser.add_argument(
"--plist", dest="peak_list", help="Path to the peak list file"
)

args = parser.parse_args()

Expand Down
Loading

0 comments on commit c8967bc

Please sign in to comment.