From 174196a853439d24683e109fd117fb5e12453af0 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol <ypriverol@gmail.com> Date: Fri, 20 Sep 2024 14:55:19 +0200 Subject: [PATCH] first iteration of pandas fdataframe.py --- fsspark/fs/fdataframe.py | 80 +++++++++++-------------------- fsspark/tests/test_fsdataframe.py | 4 +- 2 files changed, 30 insertions(+), 54 deletions(-) diff --git a/fsspark/fs/fdataframe.py b/fsspark/fs/fdataframe.py index 21cc06c..cf590b7 100644 --- a/fsspark/fs/fdataframe.py +++ b/fsspark/fs/fdataframe.py @@ -4,7 +4,7 @@ import numpy as np import pandas as pd from pandas import DataFrame, Series -from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler, StandardScaler, RobustScaler +from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler, StandardScaler, RobustScaler, LabelEncoder logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("pickfeat") @@ -34,8 +34,6 @@ def __init__( sample_col: str = None, label_col: str = None, row_index_col: Optional[str] = '_row_index', - parse_col_names: bool = False, - parse_features: bool = False, ): """ Create an instance of FSDataFrame. @@ -47,49 +45,29 @@ def __init__( :param sample_col: Sample id column name :param label_col: Sample label column name :param row_index_col: Optional. Column name of row indices. - :param parse_col_names: Replace dots (.) in column names with underscores. - :param parse_features: Coerce all features to float. """ - self.__sample_col = sample_col - self.__label_col = label_col - self.__row_index_col = row_index_col - self.__df = df - - # check input dataframe - self._check_df() - - # replace dots in column names, if any. - if parse_col_names: - self.__df = self.__df.toDF(*(c.replace('.', '_') for c in self.__df.columns)) - - # If the specified row index column name does not exist, add row index to the dataframe - if self.__row_index_col not in self.__df.columns: - self.__df = self._add_row_index(index_name=self.__row_index_col) - - if parse_features: - # coerce all features to float - non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col] - feature_cols = [c for c in self.__df.columns if c not in non_features_cols] - self.__df = self.__df.withColumns({c: self.__df[c].cast('float') for c in feature_cols}) - - self.__indexed_features = self._set_indexed_cols() - self.__indexed_instances = self._set_indexed_rows() + if sample_col is None: + self.__sample_col = None + self.__samples = [] + logging.info("No sample column specified.") + else: + self.__sample_col = sample_col + self.__samples = df[sample_col].tolist() + df = df.drop(columns=[sample_col]) - def _check_df(self): - """ - Check if input DataFrame meet the minimal requirements to feed an FS pipeline. - :return: None - """ - col_names = self.__df.columns - if self.__sample_col not in col_names: - raise ValueError(f"Column sample name {self.__sample_col} not found...") - elif self.__label_col not in col_names: - raise ValueError(f"Column label name {self.__label_col} not found...") - elif not isinstance(self.__row_index_col, str): - raise ValueError("Row index column name must be a valid string...") + if label_col is None: + raise ValueError("No label column specified. A class/label column is required.") else: - pass + self.__label_col = label_col + self.__labels = df[label_col].tolist() + label_encoder = LabelEncoder() + self.__labels_matrix = label_encoder.fit_transform(df[label_col]).tolist() + df = df.drop(columns=[label_col]) + + self.__original_features = df.columns.tolist() + numerical_df = df.select_dtypes(include=[np.number]) + self.__matrix = numerical_df.to_numpy(dtype=np.float32) def _set_indexed_cols(self) -> Series: """ @@ -97,7 +75,7 @@ def _set_indexed_cols(self) -> Series: :return: Pandas on (PoS) Series """ non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col] - features = [f for f in self.__df.columns if f not in non_features_cols] + features = [f for f in self.__matrix.columns if f not in non_features_cols] return Series(features) def _set_indexed_rows(self) -> pd.Series: @@ -109,8 +87,8 @@ def _set_indexed_rows(self) -> pd.Series: """ # Extract the label and row index columns from the DataFrame - labels = self.__df[self.__label_col] - row_indices = self.__df[self.__row_index_col] + labels = self.__matrix[self.__label_col] + row_indices = self.__matrix[self.__row_index_col] # Create a Pandas Series with row_indices as index and labels as values return pd.Series(data=labels.values, index=row_indices.values) @@ -161,7 +139,7 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pd.DataFrame :return: DataFrame """ - sdf = self.__df + sdf = self.__matrix features_cols = self.get_features_names() sdf_vector = _assemble_column_vector(sdf, input_feature_cols=features_cols, @@ -198,10 +176,10 @@ def to_psdf(self) -> DataFrame: Convert DataFrame to Pandas on DataFrame :return: Pandas on DataFrame """ - return self.__df.pandas_api() + return self.__matrix.pandas_api() def get_df(self) -> DataFrame: - return self.__df + return self.__matrix def get_sample_col_name(self) -> str: """ @@ -236,8 +214,8 @@ def _add_row_index(self, index_name: str = '_row_index') -> pd.DataFrame: :return: DataFrame with extra column of row indices. """ # Add a new column with unique row indices using a range - self.__df[index_name] = list(range(len(self.__df))) - return self.__df + self.__matrix[index_name] = list(range(len(self.__matrix))) + return self.__matrix def count_features(self) -> int: """ @@ -364,7 +342,7 @@ def split_df(self, """ label_col = self.get_label_col_name() - df = self.__df.copy() + df = self.__matrix.copy() # Create a temporary label column for sampling tmp_label_col = '_tmp_label_indexed' diff --git a/fsspark/tests/test_fsdataframe.py b/fsspark/tests/test_fsdataframe.py index 09fc2ac..5b56a23 100644 --- a/fsspark/tests/test_fsdataframe.py +++ b/fsspark/tests/test_fsdataframe.py @@ -19,9 +19,7 @@ def test_initializes_fsdataframe(): sample_col='sample_id', label_col='label', row_index_col='_row_index', - parse_col_names=False, - parse_features=False ) # Assertions to check if the initialization is correct - assert (fs_df.get_sdf(), df) \ No newline at end of file + assert isinstance(fs_df, FSDataFrame) \ No newline at end of file