Skip to content

Commit

Permalink
first iteration of pandas fdataframe.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ypriverol committed Sep 20, 2024
1 parent 66d6118 commit 174196a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 54 deletions.
80 changes: 29 additions & 51 deletions fsspark/fs/fdataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler, StandardScaler, RobustScaler
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler, StandardScaler, RobustScaler, LabelEncoder

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("pickfeat")
Expand Down Expand Up @@ -34,8 +34,6 @@ def __init__(
sample_col: str = None,
label_col: str = None,
row_index_col: Optional[str] = '_row_index',
parse_col_names: bool = False,
parse_features: bool = False,
):
"""
Create an instance of FSDataFrame.
Expand All @@ -47,57 +45,37 @@ def __init__(
:param sample_col: Sample id column name
:param label_col: Sample label column name
:param row_index_col: Optional. Column name of row indices.
:param parse_col_names: Replace dots (.) in column names with underscores.
:param parse_features: Coerce all features to float.
"""

self.__sample_col = sample_col
self.__label_col = label_col
self.__row_index_col = row_index_col
self.__df = df

# check input dataframe
self._check_df()

# replace dots in column names, if any.
if parse_col_names:
self.__df = self.__df.toDF(*(c.replace('.', '_') for c in self.__df.columns))

# If the specified row index column name does not exist, add row index to the dataframe
if self.__row_index_col not in self.__df.columns:
self.__df = self._add_row_index(index_name=self.__row_index_col)

if parse_features:
# coerce all features to float
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col]
feature_cols = [c for c in self.__df.columns if c not in non_features_cols]
self.__df = self.__df.withColumns({c: self.__df[c].cast('float') for c in feature_cols})

self.__indexed_features = self._set_indexed_cols()
self.__indexed_instances = self._set_indexed_rows()
if sample_col is None:
self.__sample_col = None
self.__samples = []
logging.info("No sample column specified.")
else:
self.__sample_col = sample_col
self.__samples = df[sample_col].tolist()
df = df.drop(columns=[sample_col])

def _check_df(self):
"""
Check if input DataFrame meet the minimal requirements to feed an FS pipeline.
:return: None
"""
col_names = self.__df.columns
if self.__sample_col not in col_names:
raise ValueError(f"Column sample name {self.__sample_col} not found...")
elif self.__label_col not in col_names:
raise ValueError(f"Column label name {self.__label_col} not found...")
elif not isinstance(self.__row_index_col, str):
raise ValueError("Row index column name must be a valid string...")
if label_col is None:
raise ValueError("No label column specified. A class/label column is required.")
else:
pass
self.__label_col = label_col
self.__labels = df[label_col].tolist()
label_encoder = LabelEncoder()
self.__labels_matrix = label_encoder.fit_transform(df[label_col]).tolist()
df = df.drop(columns=[label_col])

self.__original_features = df.columns.tolist()
numerical_df = df.select_dtypes(include=[np.number])
self.__matrix = numerical_df.to_numpy(dtype=np.float32)

def _set_indexed_cols(self) -> Series:
"""
Create a distributed indexed Series representing features.
:return: Pandas on (PoS) Series
"""
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col]
features = [f for f in self.__df.columns if f not in non_features_cols]
features = [f for f in self.__matrix.columns if f not in non_features_cols]
return Series(features)

def _set_indexed_rows(self) -> pd.Series:
Expand All @@ -109,8 +87,8 @@ def _set_indexed_rows(self) -> pd.Series:
"""

# Extract the label and row index columns from the DataFrame
labels = self.__df[self.__label_col]
row_indices = self.__df[self.__row_index_col]
labels = self.__matrix[self.__label_col]
row_indices = self.__matrix[self.__row_index_col]

# Create a Pandas Series with row_indices as index and labels as values
return pd.Series(data=labels.values, index=row_indices.values)
Expand Down Expand Up @@ -161,7 +139,7 @@ def get_sdf_vector(self, output_column_vector: str = 'features') -> pd.DataFrame
:return: DataFrame
"""

sdf = self.__df
sdf = self.__matrix
features_cols = self.get_features_names()
sdf_vector = _assemble_column_vector(sdf,
input_feature_cols=features_cols,
Expand Down Expand Up @@ -198,10 +176,10 @@ def to_psdf(self) -> DataFrame:
Convert DataFrame to Pandas on DataFrame
:return: Pandas on DataFrame
"""
return self.__df.pandas_api()
return self.__matrix.pandas_api()

def get_df(self) -> DataFrame:
return self.__df
return self.__matrix

def get_sample_col_name(self) -> str:
"""
Expand Down Expand Up @@ -236,8 +214,8 @@ def _add_row_index(self, index_name: str = '_row_index') -> pd.DataFrame:
:return: DataFrame with extra column of row indices.
"""
# Add a new column with unique row indices using a range
self.__df[index_name] = list(range(len(self.__df)))
return self.__df
self.__matrix[index_name] = list(range(len(self.__matrix)))
return self.__matrix

def count_features(self) -> int:
"""
Expand Down Expand Up @@ -364,7 +342,7 @@ def split_df(self,
"""

label_col = self.get_label_col_name()
df = self.__df.copy()
df = self.__matrix.copy()

# Create a temporary label column for sampling
tmp_label_col = '_tmp_label_indexed'
Expand Down
4 changes: 1 addition & 3 deletions fsspark/tests/test_fsdataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ def test_initializes_fsdataframe():
sample_col='sample_id',
label_col='label',
row_index_col='_row_index',
parse_col_names=False,
parse_features=False
)

# Assertions to check if the initialization is correct
assert (fs_df.get_sdf(), df)
assert isinstance(fs_df, FSDataFrame)

0 comments on commit 174196a

Please sign in to comment.