From eb6113e6208ca513625b9e81922a5c110f7cc315 Mon Sep 17 00:00:00 2001 From: Diego Marvid Date: Thu, 18 Apr 2024 14:35:40 -0300 Subject: [PATCH] update calculate features --- pipeline_lib/core/steps/calculate_features.py | 61 +++++++++++++------ 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/pipeline_lib/core/steps/calculate_features.py b/pipeline_lib/core/steps/calculate_features.py index a303114..12956c2 100644 --- a/pipeline_lib/core/steps/calculate_features.py +++ b/pipeline_lib/core/steps/calculate_features.py @@ -52,18 +52,31 @@ def __init__( f"Unsupported datetime features: {unsupported_features}" ) - def _convert_column_to_datetime(self, df: pd.DataFrame, column: str) -> None: + if self.datetime_columns and not self.features: + raise ValueError( + "No datetime features specified. Must specify at least one feature. Possible" + f" features: {list(self.feature_extractors.keys())}" + ) + + def _convert_column_to_datetime(self, df: pd.DataFrame, column: str) -> pd.DataFrame: """Convert a column to datetime.""" # Check if the column is already a datetime type if not is_datetime64_any_dtype(df[column]): try: - df.loc[:, column] = pd.to_datetime(df[column], errors="raise") - self.logger.info(f"Column '{column}' converted to datetime.") - except Exception as e: + df[column] = pd.to_datetime( + df[column], + errors="raise", + ) + self.logger.info(f"Column '{column}' automatically converted to datetime.") + except ValueError as e: self.logger.error(f"Error converting column '{column}' to datetime: {e}") + except Exception as e: + self.logger.error(f"Unexpected error converting column '{column}' to datetime: {e}") else: self.logger.debug(f"Column '{column}' is already a datetime type.") + return df + def _extract_feature(self, df: pd.DataFrame, column: str, feature: str) -> None: """Extract a single feature from a datetime column.""" extractor = self.feature_extractors[feature] @@ -73,33 +86,47 @@ def execute(self, data: DataContainer) -> DataContainer: """Execute the step.""" self.logger.info("Calculating features") - df = data.flow + data.train = self._create_datetime_features(data.train, log=True) + + if data.validation is not None: + data.validation = self._create_datetime_features(data.validation) + + if data.test is not None: + data.test = self._create_datetime_features(data.test) + + return data + + def _create_datetime_features( + self, df: pd.DataFrame, log: Optional[bool] = False + ) -> pd.DataFrame: + """Create datetime features.""" created_features = [] if self.datetime_columns: for column in self.datetime_columns: if column in df.columns: - self._convert_column_to_datetime(df, column) + df = self._convert_column_to_datetime(df, column) + if self.features: for feature in self.features: self._extract_feature(df, column, feature) created_features.append(f"{column}_{feature}") else: - self.logger.warning( - "No datetime features specified. Skipping feature extraction." - ) + if log: + self.logger.warning( + "No datetime features specified. Skipping feature extraction." + ) else: - self.logger.warning(f"Datetime column '{column}' not found in the DataFrame") + if log: + self.logger.warning("Datetime column '{column}' not found in the DataFrame") else: - self.logger.warning("No datetime columns specified. Skipping feature extraction.") + if log: + self.logger.warning("No datetime columns specified. Skipping feature extraction.") # drop original datetime columns if self.datetime_columns: df = df.drop(columns=self.datetime_columns) - self.logger.info(f"Dropped datetime columns: {self.datetime_columns}") + if log: + self.logger.info("Dropped datetime columns: {self.datetime_columns}") - self.logger.info(f"Created new features: {created_features}") - - data.flow = df - - return data + return df