Skip to content

Commit

Permalink
first iteration of pandas fdataframe.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ypriverol committed Sep 19, 2024
1 parent 70fec44 commit b99aee0
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 100 deletions.
1 change: 0 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ dependencies:
- pyspark~=3.3.0
- networkx~=2.8.7
- numpy~=1.23.4
- pandas~=1.5.1
- pyarrow~=8.0.0
41 changes: 21 additions & 20 deletions fsspark/fs/fdataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,25 @@ def __init__(
:param parse_features: Coerce all features to float.
"""

self.__df = self._convert_psdf_to_sdf(df)
self.__sample_col = sample_col
self.__label_col = label_col
self.__row_index_name = row_index_col
self.__row_index_col = row_index_col
self.__df = df

# check input dataframe
self._check_df()

# replace dots in column names, if any.
if parse_col_names:
# TODO: Dots in column names are prone to errors, since dots are used to access attributes from DataFrame.
# Should we make this replacement optional? Or print out a warning?
self.__df = self.__df.toDF(*(c.replace('.', '_') for c in self.__df.columns))

# If the specified row index column name does not exist, add row index to the dataframe
if self.__row_index_name not in self.__df.columns:
self.__df = self._add_row_index(index_name=self.__row_index_name)
if self.__row_index_col not in self.__df.columns:
self.__df = self._add_row_index(index_name=self.__row_index_col)

if parse_features:
# coerce all features to float
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name]
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col]
feature_cols = [c for c in self.__df.columns if c not in non_features_cols]
self.__df = self.__df.withColumns({c: self.__df[c].cast('float') for c in feature_cols})

Expand All @@ -87,7 +85,7 @@ def _check_df(self):
raise ValueError(f"Column sample name {self.__sample_col} not found...")
elif self.__label_col not in col_names:
raise ValueError(f"Column label name {self.__label_col} not found...")
elif not isinstance(self.__row_index_name, str):
elif not isinstance(self.__row_index_col, str):
raise ValueError("Row index column name must be a valid string...")
else:
pass
Expand All @@ -97,21 +95,24 @@ def _set_indexed_cols(self) -> Series:
Create a distributed indexed Series representing features.
:return: Pandas on (PoS) Series
"""
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_name]
non_features_cols = [self.__sample_col, self.__label_col, self.__row_index_col]
features = [f for f in self.__df.columns if f not in non_features_cols]
return Series(features)

def _set_indexed_rows(self) -> Series:
def _set_indexed_rows(self) -> pd.Series:
"""
Create a distributed indexed Series representing samples labels.
It will use existing row indices, if any.
Create an indexed Series representing sample labels.
It will use existing row indices from the DataFrame.
:return: Pandas on (PoS) Series
:return: Pandas Series
"""
# TODO: Check for equivalent to pandas distributed Series in .
label = self.__df.select(self.__label_col).collect()
row_index = self.__df.select(self.__row_index_name).collect()
return Series(label, index=row_index)

# Extract the label and row index columns from the DataFrame
labels = self.__df[self.__label_col]
row_indices = self.__df[self.__row_index_col]

# Create a Pandas Series with row_indices as index and labels as values
return pd.Series(data=labels.values, index=row_indices.values)

def get_features_indexed(self) -> Series:
"""
Expand Down Expand Up @@ -223,7 +224,7 @@ def get_row_index_name(self) -> str:
:return: Row id column name.
"""
return self.__row_index_name
return self.__row_index_col

def _add_row_index(self, index_name: str = '_row_index') -> pd.DataFrame:
"""
Expand Down Expand Up @@ -276,12 +277,12 @@ def filter_features(self, features: List[str], keep: bool = True) -> 'FSDataFram
sdf = sdf.select(
self.__sample_col,
self.__label_col,
self.__row_index_name,
self.__row_index_col,
*features)
else:
sdf = sdf.drop(*features)

fsdf_filtered = self.update(sdf, self.__sample_col, self.__label_col, self.__row_index_name)
fsdf_filtered = self.update(sdf, self.__sample_col, self.__label_col, self.__row_index_col)
count_b = fsdf_filtered.count_features()

logger.info(f"{count_b} features out of {count_a} remain after applying this filter...")
Expand Down
79 changes: 0 additions & 79 deletions fsspark/tests/test_FSDataFrame.py

This file was deleted.

27 changes: 27 additions & 0 deletions fsspark/tests/test_fsdataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest
import pandas as pd
from fsspark.fs.fdataframe import FSDataFrame

def test_initializes_fsdataframe():

# Create a sample DataFrame
data = {
'sample_id': [1, 2, 3],
'label': ['A', 'B', 'C'],
'feature1': [0.1, 0.2, 0.3],
'feature2': [1.1, 1.2, 1.3]
}
df = pd.DataFrame(data)

# Initialize FSDataFrame
fs_df = FSDataFrame(
df=df,
sample_col='sample_id',
label_col='label',
row_index_col='_row_index',
parse_col_names=False,
parse_features=False
)

# Assertions to check if the initialization is correct
assert (fs_df.get_sdf(), df)

0 comments on commit b99aee0

Please sign in to comment.