From 863e4272d77f820d4b9b4dee60f24840f284af70 Mon Sep 17 00:00:00 2001 From: mpecchi Date: Thu, 29 Aug 2024 13:13:01 -0400 Subject: [PATCH] added some testing --- src/gcms_data_analysis/gcms.py | 100 ++++++++++++++++++++++++++++----- 1 file changed, 86 insertions(+), 14 deletions(-) diff --git a/src/gcms_data_analysis/gcms.py b/src/gcms_data_analysis/gcms.py index a00dfd2..e84b82e 100644 --- a/src/gcms_data_analysis/gcms.py +++ b/src/gcms_data_analysis/gcms.py @@ -149,6 +149,7 @@ def __init__( self.list_of_all_compounds: list[str] | None = None self.compounds_properties: pd.DataFrame | None = None self.dict_names_to_iupacs: dict[str, str] | None = None + self.deriv_to_underiv_names: dict[str, str] | None = None self.deriv_list_of_all_compounds: list[str] | None = None self.deriv_files_present: bool = False @@ -415,6 +416,36 @@ def create_compounds_properties( print("Info: compounds_properties created") return self.compounds_properties + def create_deriv_to_underiv_names( + self, update_saved_files_info: bool = False + ) -> dict[str, str]: + """creates a dict of names to be used for the derivatized compounds, + that needs to be given at the beginning of the project setting""" + print("Info: create_compounds_properties: started") + + if self.list_of_all_compounds is None: + self.create_list_of_all_compounds() + deriv_to_underiv_names = {} + for comp in self.list_of_all_compounds: + if any(der_str in comp for der_str in self.string_in_deriv_names): + # drop the parts in the str after the last comma but keep the rest unchanged + under_comp = comp.rsplit(",", 1)[0] + pcp_under_comp = get_compound_from_pubchempy(under_comp) + if pcp_under_comp is not None: + try: + iupac_name = pcp_under_comp.iupac_name.lower() + deriv_to_underiv_names[comp] = iupac_name + except AttributeError: # iupac_name not give + pass + self.deriv_to_underiv_names = deriv_to_underiv_names + # save db in the project folder in the input + if update_saved_files_info: + # create a df from the dict_deriv + ddsn = pd.DataFrame.from_dict(self.deriv_to_underiv_names, orient="index") + ddsn.to_excel(plib.Path(self.folder_path, "compounds_properties.xlsx")) + print("Info: deriv_to_underiv_names created") + return self.deriv_to_underiv_names + def load_compounds_properties(self) -> pd.DataFrame: """Attempts to load the 'compounds_properties.xlsx' file containing physical and chemical properties of compounds. If not found, it creates a new properties @@ -429,6 +460,20 @@ def load_compounds_properties(self) -> pd.DataFrame: cpdf = self.create_compounds_properties() return self.compounds_properties + def load_deriv_to_underiv_names(self) -> dict[str, str]: + """Attempts to load the 'deriv_to_underiv_names.xlsx' file. + If not found, it creates a new dict.""" + prop_path = plib.Path(self.folder_path, "deriv_to_underiv_names.xlsx") + if prop_path.exists(): + df = pd.read_excel(prop_path, index_col="comp_name") + print("Info: deriv_to_underiv_names loaded") + else: + print("Warning: deriv_to_underiv_names.xlsx not found") + df = self.create_deriv_to_underiv_names() + + self.deriv_to_underiv_names = df.to_dict() + return self.deriv_to_underiv_names + def create_dict_names_to_iupacs(self) -> dict[str, str]: if self.compounds_properties is None: self.load_compounds_properties() @@ -503,15 +548,15 @@ def create_tanimoto_and_molecular_weight_similarity_dfs( mw_diff_df.loc[iupac, :] = np.abs(calib_mws - weight) self.tanimoto_similarity_df[calibrationname] = tan_sim_df self.molecular_weight_diff_df[calibrationname] = mw_diff_df - return ( - self.tanimoto_similarity_df[calibrationname], - self.molecular_weight_diff_df[calibrationname], - ) + return ( + self.tanimoto_similarity_df[calibrationname], + self.molecular_weight_diff_df[calibrationname], + ) def create_semi_calibration_dict(self) -> dict[str, dict[str, str]]: if not self.tanimoto_similarity_df or not self.molecular_weight_diff_df: self.create_tanimoto_and_molecular_weight_similarity_dfs() - for calibrationname in self.calibrations.keys(): + for calibrationname in self.calibrations: if self.tanimoto_similarity_threshold is not None: all_valid_ts = self.tanimoto_similarity_df[calibrationname].where( self.tanimoto_similarity_df[calibrationname] @@ -538,7 +583,7 @@ def create_semi_calibration_dict(self) -> dict[str, dict[str, str]]: for k in best_valid_ts.keys() if k in best_valid_mw and best_valid_ts[k] == best_valid_mw[k] } - return self.semi_calibration_dict[calibrationname] + return self.semi_calibration_dict[calibrationname] def apply_calibration_to_files(self): """Applies the appropriate calibration curve to each compound @@ -741,12 +786,19 @@ def create_single_sample_from_files( # Step 1: Create a comprehensive index of all unique compounds all_compounds = pd.Index([]) for df in files_in_sample: + # if the index has duplicates, drop the last occurrence all_compounds = all_compounds.union(df.index) - + # keep only unique compounds + all_compounds = all_compounds.drop_duplicates() + # check all_compounds does not have duplicate + if all_compounds.duplicated().any(): + raise ValueError("all_compounds has duplicates") # Step 2: Align all DataFrames to the comprehensive index - aligned_dfs: list[pd.DataFrame] = [ - df.reindex(all_compounds) for df in files_in_sample - ] + aligned_dfs: list[pd.DataFrame] = [] + for df in files_in_sample: + if df.index.duplicated().any(): + df = df[~df.index.duplicated(keep="last")] + aligned_dfs.append(df.reindex(all_compounds)) # Fill NaN values for numerical columns after alignment and before concatenation filled_dfs = [df.fillna(0.0) for df in aligned_dfs] # Keep non-numerical data separately and ensure no duplicates @@ -823,10 +875,20 @@ def create_files_param_report(self, param="conc_vial_mg_L"): if param not in self.files[filename].columns: raise ValueError(f"{param = } not found in {filename = }") # Create a dictionary of Series, each Series named after the file and containing the 'param' values - series_dict = { - filename: self.files[filename][param].rename(filename) - for filename in self.files_info.index - } + series_dict = {} + for filename in self.files_info.index: + # check if there are duplicates in the index + if self.files[filename].index.duplicated().any(): + # if so drop the last occurrence of the duplicates + self.files[filename] = self.files[filename][ + ~self.files[filename].index.duplicated(keep="last") + ] + series_dict[filename] = self.files[filename][param].rename(filename) + + # series_dict = { + # filename: self.files[filename][param].rename(filename) + # for filename in self.files_info.index + # } # Get the union of all indices from the individual DataFrames rep = pd.concat( series_dict.values(), axis=1, keys=series_dict.keys(), join="outer" @@ -1007,6 +1069,7 @@ def plot_report( show_total_in_twinx: bool = False, y_axis_min_threshold: float | None = None, item_to_color_to_hatch: pd.DataFrame | None = None, + items_to_rename: dict[str, str] | None = None, alternative_colors: list[tuple] = None, yt_sum_label: str = "total\n(right axis)", remove_insignificant_values: bool = False, @@ -1052,6 +1115,15 @@ def plot_report( if files_or_samples == "samples": df_std.index = labels + # if items_to_rename is not None rename the elements in the df.columns + # and merge rows that have the same name + if items_to_rename is not None: + df_ave.columns = [items_to_rename.get(c, c) for c in df_ave.columns] + df_ave = df_ave.groupby(df_ave.columns, axis=1).sum() + if files_or_samples == "samples": + df_std.columns = [items_to_rename.get(c, c) for c in df_std.columns] + df_std = df_std.groupby(df_std.columns, axis=1).sum() + if y_axis_min_threshold is not None: df_ave = df_ave.loc[:, (df_ave > y_axis_min_threshold).any(axis=0)].copy() if files_or_samples == "samples":