Skip to content

Commit

Permalink
Add the possibility to work with 3D spectra
Browse files Browse the repository at this point in the history
  • Loading branch information
gbouvignies committed Jul 18, 2024
1 parent c8967bc commit b935842
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 392 deletions.
16 changes: 15 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions peakfit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
class Arguments:
"""The dataclass for the command-line arguments."""

path_spectra: list[Path] = field(default_factory=list)
path_spectra: Path = field(default_factory=Path)
path_list: Path = field(default_factory=Path)
path_z_values: list[Path] = field(default_factory=list)
path_z_values: Path | None = None
contour_level: float | None = None
noise: float | None = None
path_output: Path = field(default_factory=Path)
path_output: Path = Path("Fits")
refine_nb: int = 1
fixed: bool = False
pvoigt: bool = False
Expand All @@ -32,9 +32,9 @@ def build_parser() -> ArgumentParser:

parser = ArgumentParser(description=description)

parser.add_argument("-s", dest="path_spectra", type=Path, required=True, nargs="+")
parser.add_argument("-s", dest="path_spectra", type=Path, required=True)
parser.add_argument("-l", dest="path_list", type=Path, required=True)
parser.add_argument("-z", dest="path_z_values", type=Path, required=True, nargs="+")
parser.add_argument("-z", dest="path_z_values", type=Path)
parser.add_argument("-t", dest="contour_level", type=float)
parser.add_argument("-n", dest="noise", type=float)
parser.add_argument("-o", dest="path_output", type=Path, default="Fits")
Expand Down
5 changes: 1 addition & 4 deletions peakfit/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

import networkx as nx
import numpy as np
import numpy.typing as npt
from scipy.ndimage import binary_dilation, generate_binary_structure, label

from peakfit.messages import print_segmenting
from peakfit.peak import Peak
from peakfit.spectra import Spectra

FloatArray = npt.NDArray[np.float64]
IntArray = npt.NDArray[np.int_]
from peakfit.typing import FloatArray, IntArray


@dataclass
Expand Down
39 changes: 16 additions & 23 deletions peakfit/computing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import lmfit as lf
import numpy as np
import numpy.typing as npt

from peakfit.clustering import Cluster

FloatArray = npt.NDArray[np.float64]
from peakfit.typing import FloatArray


def calculate_shapes(params: lf.Parameters, cluster: Cluster) -> FloatArray:
Expand All @@ -19,21 +17,6 @@ def calculate_amplitudes(shapes: FloatArray, data: FloatArray) -> FloatArray:
return np.linalg.lstsq(shapes.T, data, rcond=None)[0]


def calculate_amplitudes_err(
shapes: FloatArray, data: FloatArray
) -> tuple[FloatArray, FloatArray]:
amplitudes, chi2, _rank, _s = np.linalg.lstsq(shapes.T, data, rcond=None)
cov = np.linalg.pinv(shapes @ shapes.T)
n = data.shape[0]
k = amplitudes.shape[0]
cov_scaled = cov * chi2.reshape(-1, 1, 1) / (n - k)
amplitudes_err = np.sqrt(np.diagonal(cov_scaled, axis1=1, axis2=2))

amplitudes_err = np.full_like(amplitudes_err, np.mean(amplitudes_err))

return amplitudes, amplitudes_err.T


def calculate_shape_heights(
params: lf.Parameters, cluster: Cluster
) -> tuple[FloatArray, FloatArray]:
Expand All @@ -48,16 +31,26 @@ def residuals(params: lf.Parameters, cluster: Cluster, noise: float) -> FloatArr


def simulate_data(
params: lf.Parameters, cluster: Cluster, data: FloatArray
params: lf.Parameters, clusters: Sequence[Cluster], data: FloatArray
) -> FloatArray:
_shapes, amplitudes = calculate_shape_heights(params, cluster)
cluster_all = Cluster.from_clusters([cluster])
amplitudes_list: list[FloatArray] = []
for cluster in clusters:
shapes, amplitudes = calculate_shape_heights(params, cluster)
amplitudes_list.append(amplitudes)
amplitudes = np.concatenate(amplitudes_list)
cluster_all = Cluster.from_clusters(clusters)
cluster_all.positions = [
indices.ravel() for indices in list(np.indices(data.shape[1:]))
]
shapes = calculate_shapes(params, cluster_all)

return (amplitudes.T @ shapes).reshape(-1, *data.shape[1:])
return sum(
(
amplitudes[index][:, np.newaxis]
* peak.evaluate(cluster_all.positions, params)
for index, peak in enumerate(cluster_all.peaks)
),
start=np.array(0.0),
).reshape(data.shape)


def update_cluster_corrections(
Expand Down
122 changes: 66 additions & 56 deletions peakfit/nmrpipe.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from dataclasses import dataclass, field
from typing import Any
from typing import Any, TypeVar

import numpy as np
from numpy.typing import NDArray

FloatArray = NDArray[np.float64]
ArrayInt = NDArray[np.int_]
from peakfit.typing import FloatArray

ArrayInt = NDArray[np.int_]
T = TypeVar("T", float, FloatArray)

P1_MIN = 175.0
P1_MAX = 185.0


@dataclass()
@dataclass
class SpectralParameters:
size: int
sw: float
Expand All @@ -25,94 +26,103 @@ class SpectralParameters:
apodq3: float
p180: bool
direct: bool
ft: bool
delta: float = field(init=False)
first: float = field(init=False)

def __post_init__(self) -> None:
# derived units (these are in ppm)
self.delta = -self.sw / (self.size * self.obs)
self.first = self.car / self.obs - self.delta * self.size / 2.0
self.delta = (
-self.sw / (self.size * self.obs) if self.size * self.obs != 0.0 else 0.0
)
self.first = (
self.car / self.obs - self.delta * self.size / 2.0
if self.obs != 0.0
else 0.0
)

def hz2pts_delta[T: (float, FloatArray)](self, hz: T) -> T:
def hz2pts_delta(self, hz: T) -> T:
return hz / (self.obs * self.delta)

def pts2hz_delta[T: (float, FloatArray)](self, pts: T) -> T:
def pts2hz_delta(self, pts: T) -> T:
return pts * self.obs * self.delta

def hz2pts[T: (float, FloatArray)](self, hz: T) -> T:
def hz2pts(self, hz: T) -> T:
return ((hz / self.obs) - self.first) / self.delta

def hz2pt_i(self, hz: float) -> int:
return int(round(self.hz2pts(hz))) % self.size

def pts2hz[T: (float, FloatArray)](self, pts: T) -> T:
def pts2hz(self, pts: T) -> T:
return (pts * self.delta + self.first) * self.obs

def ppm2pts[T: (float, FloatArray)](self, ppm: T) -> T:
def ppm2pts(self, ppm: T) -> T:
return (ppm - self.first) / self.delta

def ppm2pt_i(self, ppm: float) -> int:
return int(round(self.ppm2pts(ppm))) % self.size

def pts2ppm[T: (float, FloatArray)](self, pts: T) -> T:
def pts2ppm(self, pts: T) -> T:
return (pts * self.delta) + self.first

def hz2ppm[T: (float, FloatArray)](self, hz: T) -> T:
def hz2ppm(self, hz: T) -> T:
return hz / self.obs


def read_spectral_parameters(
dic: dict[str, Any], data: FloatArray
) -> list[SpectralParameters]:
fdf_in_order = [f"FDF{int(dic[f'FDDIMORDER{i}'])}" for i in range(1, data.ndim + 1)]
fdf_direct = fdf_in_order[0]
labels = ["FDSIZE", "FDSPECNUM", "FDF3SIZE", "FDF4SIZE"]
ndsizes = {
fdf: int(dic.get(label, 1))
for fdf, label in zip(fdf_in_order, labels, strict=False)
}

spec_params = []
for fdf in reversed(fdf_in_order):
size = ndsizes[fdf]
aq_time = calculate_acquisition_time(dic, fdf, size, fdf_direct)
sw, obs, orig = (
dic.get(f"{fdf}SW", 1.0),
dic.get(f"{fdf}OBS", 1.0),
dic.get(f"{fdf}ORIG", 0.0),
)
car = orig + sw / 2.0 - sw / size
spec_params: list[SpectralParameters] = []

for i in range(data.ndim):
size = data.shape[i]
fdf = f"FDF{int(dic['FDDIMORDER'][data.ndim - 1 - i])}"
is_direct = i == data.ndim - 1
ft = dic.get(f"{fdf}FTFLAG", 0.0) == 1.0

if ft:
sw = dic.get(f"{fdf}SW", 1.0)
orig = dic.get(f"{fdf}ORIG", 0.0)
obs = dic.get(f"{fdf}OBS", 1.0)
car = orig + sw / 2.0 - sw / size
aq_time = dic.get(f"{fdf}APOD", 0.0) / max(sw, 1e-6)
p180 = P1_MIN <= abs(dic.get(f"{fdf}P1", 0.0)) <= P1_MAX
else:
sw = obs = car = aq_time = 1.0
p180 = False

p180 = P1_MIN <= abs(dic.get(f"{fdf}P1", 0.0)) <= P1_MAX

direct = fdf == fdf_direct
spec_params.append(
SpectralParameters(
size,
sw,
obs,
car,
aq_time,
dic.get(f"{fdf}APODCODE", 0.0),
dic.get(f"{fdf}APODQ1", 0.0),
dic.get(f"{fdf}APODQ2", 0.0),
dic.get(f"{fdf}APODQ3", 0.0),
p180,
direct,
size=size,
sw=sw,
obs=obs,
car=car,
aq_time=aq_time,
apocode=dic.get(f"{fdf}APODCODE", 0.0),
apodq1=dic.get(f"{fdf}APODQ1", 0.0),
apodq2=dic.get(f"{fdf}APODQ2", 0.0),
apodq3=dic.get(f"{fdf}APODQ3", 0.0),
p180=p180,
direct=is_direct,
ft=ft,
)
)

return spec_params


def calculate_acquisition_time(
dic: dict[str, Any], fdf: str, size: int, fdf_direct: str
) -> float:
aq_time = 0.0
if dic.get(f"{fdf}FTSIZE", 0.0) != 0.0:
aq_time = dic[f"{fdf}TDSIZE"] / dic[f"{fdf}SW"] * size / dic[f"{fdf}FTSIZE"]
if fdf == fdf_direct:
aq_time *= 1.0 - dic.get("FDDMXVAL", 0.0) / dic[f"{fdf}TDSIZE"]
correction_factor = 1.0 if dic.get(f"{fdf}APODCODE", 0.0) == 1.0 else 0.5
aq_time *= 1.0 - correction_factor / dic[f"{fdf}TDSIZE"]
return aq_time
# def calculate_acquisition_time(
# dic: dict[str, Any], fdf: str, size: int, *, is_direct: bool
# ) -> float:
# if dic.get(f"{fdf}FTSIZE", 0.0) == 0.0:
# return 0.0

# aq_time = dic[f"{fdf}TDSIZE"] / dic[f"{fdf}SW"] * size / dic[f"{fdf}FTSIZE"]

# if is_direct:
# aq_time *= 1.0 - dic.get("FDDMXVAL", 0.0) / dic[f"{fdf}TDSIZE"]

# correction_factor = 1.0 if dic.get(f"{fdf}APODCODE", 0.0) == 1.0 else 0.5
# aq_time *= 1.0 - correction_factor / dic[f"{fdf}TDSIZE"]

# return aq_time
5 changes: 1 addition & 4 deletions peakfit/noise.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import numpy as np
from lmfit.models import GaussianModel
from numpy.typing import NDArray

from peakfit.cli import Arguments
from peakfit.messages import print_estimated_noise
from peakfit.spectra import Spectra

FloatArray = NDArray[np.float64]
IntArray = NDArray[np.int_]
from peakfit.typing import FloatArray


def prepare_noise_level(clargs: Arguments, spectra: Spectra) -> float:
Expand Down
7 changes: 2 additions & 5 deletions peakfit/peak.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@

import lmfit as lf
import numpy as np
from numpy.typing import NDArray

from peakfit.cli import Arguments
from peakfit.shapes import SHAPES, Shape
from peakfit.spectra import Spectra

FloatArray = NDArray[np.float64]
IntArray = NDArray[np.int_]
from peakfit.typing import FloatArray, IntArray


class Peak:
Expand Down Expand Up @@ -54,7 +51,7 @@ def positions_i(self) -> IntArray:
return np.array([shape.center_i for shape in self.shapes], dtype=np.int_)

@property
def positions_hz(self) -> IntArray:
def positions_hz(self) -> FloatArray:
return np.array(
[shape.spec_params.pts2hz(shape.center_i) for shape in self.shapes],
dtype=np.float64,
Expand Down
Loading

0 comments on commit b935842

Please sign in to comment.