Skip to content

Commit

Permalink
added some testing
Browse files Browse the repository at this point in the history
  • Loading branch information
mpecchi committed Aug 29, 2024
1 parent 0e3c44c commit 863e427
Showing 1 changed file with 86 additions and 14 deletions.
100 changes: 86 additions & 14 deletions src/gcms_data_analysis/gcms.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def __init__(
self.list_of_all_compounds: list[str] | None = None
self.compounds_properties: pd.DataFrame | None = None
self.dict_names_to_iupacs: dict[str, str] | None = None
self.deriv_to_underiv_names: dict[str, str] | None = None

self.deriv_list_of_all_compounds: list[str] | None = None
self.deriv_files_present: bool = False
Expand Down Expand Up @@ -415,6 +416,36 @@ def create_compounds_properties(
print("Info: compounds_properties created")
return self.compounds_properties

def create_deriv_to_underiv_names(
self, update_saved_files_info: bool = False
) -> dict[str, str]:
"""creates a dict of names to be used for the derivatized compounds,
that needs to be given at the beginning of the project setting"""
print("Info: create_compounds_properties: started")

if self.list_of_all_compounds is None:
self.create_list_of_all_compounds()
deriv_to_underiv_names = {}
for comp in self.list_of_all_compounds:
if any(der_str in comp for der_str in self.string_in_deriv_names):
# drop the parts in the str after the last comma but keep the rest unchanged
under_comp = comp.rsplit(",", 1)[0]
pcp_under_comp = get_compound_from_pubchempy(under_comp)
if pcp_under_comp is not None:
try:
iupac_name = pcp_under_comp.iupac_name.lower()
deriv_to_underiv_names[comp] = iupac_name
except AttributeError: # iupac_name not give
pass
self.deriv_to_underiv_names = deriv_to_underiv_names
# save db in the project folder in the input
if update_saved_files_info:
# create a df from the dict_deriv
ddsn = pd.DataFrame.from_dict(self.deriv_to_underiv_names, orient="index")
ddsn.to_excel(plib.Path(self.folder_path, "compounds_properties.xlsx"))
print("Info: deriv_to_underiv_names created")
return self.deriv_to_underiv_names

def load_compounds_properties(self) -> pd.DataFrame:
"""Attempts to load the 'compounds_properties.xlsx' file containing physical
and chemical properties of compounds. If not found, it creates a new properties
Expand All @@ -429,6 +460,20 @@ def load_compounds_properties(self) -> pd.DataFrame:
cpdf = self.create_compounds_properties()
return self.compounds_properties

def load_deriv_to_underiv_names(self) -> dict[str, str]:
"""Attempts to load the 'deriv_to_underiv_names.xlsx' file.
If not found, it creates a new dict."""
prop_path = plib.Path(self.folder_path, "deriv_to_underiv_names.xlsx")
if prop_path.exists():
df = pd.read_excel(prop_path, index_col="comp_name")
print("Info: deriv_to_underiv_names loaded")
else:
print("Warning: deriv_to_underiv_names.xlsx not found")
df = self.create_deriv_to_underiv_names()

self.deriv_to_underiv_names = df.to_dict()
return self.deriv_to_underiv_names

def create_dict_names_to_iupacs(self) -> dict[str, str]:
if self.compounds_properties is None:
self.load_compounds_properties()
Expand Down Expand Up @@ -503,15 +548,15 @@ def create_tanimoto_and_molecular_weight_similarity_dfs(
mw_diff_df.loc[iupac, :] = np.abs(calib_mws - weight)
self.tanimoto_similarity_df[calibrationname] = tan_sim_df
self.molecular_weight_diff_df[calibrationname] = mw_diff_df
return (
self.tanimoto_similarity_df[calibrationname],
self.molecular_weight_diff_df[calibrationname],
)
return (
self.tanimoto_similarity_df[calibrationname],
self.molecular_weight_diff_df[calibrationname],
)

def create_semi_calibration_dict(self) -> dict[str, dict[str, str]]:
if not self.tanimoto_similarity_df or not self.molecular_weight_diff_df:
self.create_tanimoto_and_molecular_weight_similarity_dfs()
for calibrationname in self.calibrations.keys():
for calibrationname in self.calibrations:
if self.tanimoto_similarity_threshold is not None:
all_valid_ts = self.tanimoto_similarity_df[calibrationname].where(
self.tanimoto_similarity_df[calibrationname]
Expand All @@ -538,7 +583,7 @@ def create_semi_calibration_dict(self) -> dict[str, dict[str, str]]:
for k in best_valid_ts.keys()
if k in best_valid_mw and best_valid_ts[k] == best_valid_mw[k]
}
return self.semi_calibration_dict[calibrationname]
return self.semi_calibration_dict[calibrationname]

def apply_calibration_to_files(self):
"""Applies the appropriate calibration curve to each compound
Expand Down Expand Up @@ -741,12 +786,19 @@ def create_single_sample_from_files(
# Step 1: Create a comprehensive index of all unique compounds
all_compounds = pd.Index([])
for df in files_in_sample:
# if the index has duplicates, drop the last occurrence
all_compounds = all_compounds.union(df.index)

# keep only unique compounds
all_compounds = all_compounds.drop_duplicates()
# check all_compounds does not have duplicate
if all_compounds.duplicated().any():
raise ValueError("all_compounds has duplicates")
# Step 2: Align all DataFrames to the comprehensive index
aligned_dfs: list[pd.DataFrame] = [
df.reindex(all_compounds) for df in files_in_sample
]
aligned_dfs: list[pd.DataFrame] = []
for df in files_in_sample:
if df.index.duplicated().any():
df = df[~df.index.duplicated(keep="last")]
aligned_dfs.append(df.reindex(all_compounds))
# Fill NaN values for numerical columns after alignment and before concatenation
filled_dfs = [df.fillna(0.0) for df in aligned_dfs]
# Keep non-numerical data separately and ensure no duplicates
Expand Down Expand Up @@ -823,10 +875,20 @@ def create_files_param_report(self, param="conc_vial_mg_L"):
if param not in self.files[filename].columns:
raise ValueError(f"{param = } not found in {filename = }")
# Create a dictionary of Series, each Series named after the file and containing the 'param' values
series_dict = {
filename: self.files[filename][param].rename(filename)
for filename in self.files_info.index
}
series_dict = {}
for filename in self.files_info.index:
# check if there are duplicates in the index
if self.files[filename].index.duplicated().any():
# if so drop the last occurrence of the duplicates
self.files[filename] = self.files[filename][
~self.files[filename].index.duplicated(keep="last")
]
series_dict[filename] = self.files[filename][param].rename(filename)

# series_dict = {
# filename: self.files[filename][param].rename(filename)
# for filename in self.files_info.index
# }
# Get the union of all indices from the individual DataFrames
rep = pd.concat(
series_dict.values(), axis=1, keys=series_dict.keys(), join="outer"
Expand Down Expand Up @@ -1007,6 +1069,7 @@ def plot_report(
show_total_in_twinx: bool = False,
y_axis_min_threshold: float | None = None,
item_to_color_to_hatch: pd.DataFrame | None = None,
items_to_rename: dict[str, str] | None = None,
alternative_colors: list[tuple] = None,
yt_sum_label: str = "total\n(right axis)",
remove_insignificant_values: bool = False,
Expand Down Expand Up @@ -1052,6 +1115,15 @@ def plot_report(
if files_or_samples == "samples":
df_std.index = labels

# if items_to_rename is not None rename the elements in the df.columns
# and merge rows that have the same name
if items_to_rename is not None:
df_ave.columns = [items_to_rename.get(c, c) for c in df_ave.columns]
df_ave = df_ave.groupby(df_ave.columns, axis=1).sum()
if files_or_samples == "samples":
df_std.columns = [items_to_rename.get(c, c) for c in df_std.columns]
df_std = df_std.groupby(df_std.columns, axis=1).sum()

if y_axis_min_threshold is not None:
df_ave = df_ave.loc[:, (df_ave > y_axis_min_threshold).any(axis=0)].copy()
if files_or_samples == "samples":
Expand Down

0 comments on commit 863e427

Please sign in to comment.