From 2ff4d5687040f0547016239a9a98712127f77010 Mon Sep 17 00:00:00 2001 From: v1docq Date: Tue, 28 Nov 2023 13:56:37 +0300 Subject: [PATCH] merge all feature filtration in 1 file --- .../core/models/nn/network_modules/losses.py | 3 +- .../filtration/feature_filtration.py | 127 +++++++++++++++++- .../filtration/quantile_filtration.py | 10 -- .../operation/optimization/FeatureSpace.py | 60 --------- .../transformation/FeatureSpaceReducer.py | 71 ---------- 5 files changed, 127 insertions(+), 144 deletions(-) delete mode 100644 fedot_ind/core/operation/filtration/quantile_filtration.py delete mode 100644 fedot_ind/core/operation/optimization/FeatureSpace.py delete mode 100644 fedot_ind/core/operation/transformation/FeatureSpaceReducer.py diff --git a/fedot_ind/core/models/nn/network_modules/losses.py b/fedot_ind/core/models/nn/network_modules/losses.py index 31b2f8497..81f552d98 100644 --- a/fedot_ind/core/models/nn/network_modules/losses.py +++ b/fedot_ind/core/models/nn/network_modules/losses.py @@ -5,6 +5,7 @@ from fastai.torch_core import Module import torch.nn.functional as F + class HuberLoss(nn.Module): """Huber loss @@ -167,4 +168,4 @@ def __init__(self): super().__init__() def forward(self, input: Tensor, target: Tensor) -> Tensor: - return 100 * torch.mean(2 * torch.abs(input - target) / (torch.abs(target) + torch.abs(input)) + 1e-8) \ No newline at end of file + return 100 * torch.mean(2 * torch.abs(input - target) / (torch.abs(target) + torch.abs(input)) + 1e-8) diff --git a/fedot_ind/core/operation/filtration/feature_filtration.py b/fedot_ind/core/operation/filtration/feature_filtration.py index 4bcf052d4..9011ad130 100644 --- a/fedot_ind/core/operation/filtration/feature_filtration.py +++ b/fedot_ind/core/operation/filtration/feature_filtration.py @@ -4,10 +4,10 @@ import pandas as pd from fedot.core.operations.operation_parameters import OperationParameters from fedot.core.pipelines.node import PipelineNode -from fedot.core.pipelines.pipeline_builder import PipelineBuilder from scipy.spatial.distance import cdist from scipy.stats import stats - +from sklearn.feature_selection import VarianceThreshold +from sklearn.decomposition import PCA from fedot_ind.core.operation.IndustrialCachableOperation import IndustrialCachableOperationImplementation from fedot_ind.core.operation.transformation.basis.fourier import FourierBasisImplementation from fedot_ind.core.operation.transformation.window_selector import WindowSizeSelector @@ -87,3 +87,126 @@ def filter_signal(self, data): 'approximation': self.fourier_approx}). \ transform(data).features return model + + +class FeatureSpaceReducer: + + def reduce_feature_space(self, features: pd.DataFrame, + var_threshold: float = 0.01, + corr_threshold: float = 0.98) -> pd.DataFrame: + """Method responsible for reducing feature space. + + Args: + features: dataframe with extracted features. + corr_threshold: cut-off value for correlation threshold. + var_threshold: cut-off value for variance threshold. + + Returns: + Dataframe with reduced feature space. + + """ + init_feature_space_size = features.shape[1] + + features = self._drop_stable_features(features, var_threshold) + features_new = self._drop_correlated_features(corr_threshold, features) + + final_feature_space_size = features_new.shape[1] + + if init_feature_space_size != final_feature_space_size: + self.logger.info(f'Feature space reduced from {init_feature_space_size} to {final_feature_space_size}') + + return features_new + + def _drop_correlated_features(self, corr_threshold, features): + features_corr = features.corr(method='pearson') + mask = np.ones(features_corr.columns.size) - np.eye(features_corr.columns.size) + df_corr = mask * features_corr + drops = [] + for col in df_corr.columns.values: + # continue if the feature is already in the drop list + if np.in1d([col], drops): + continue + + index_of_corr_feature = df_corr[abs(df_corr[col]) > corr_threshold].index + drops = np.union1d(drops, index_of_corr_feature) + + if len(drops) == 0: + self.logger.info('No correlated features found') + return features + + features_new = features.copy() + features_new.drop(drops, axis=1, inplace=True) + return features_new + + def _drop_stable_features(self, features, var_threshold): + try: + variance_reducer = VarianceThreshold(threshold=var_threshold) + variance_reducer.fit_transform(features) + unstable_features_mask = variance_reducer.get_support() + features = features.loc[:, unstable_features_mask] + except ValueError: + self.logger.info('Variance reducer has not found any features with low variance') + return features + + def validate_window_size(self, ts: np.ndarray): + if self.window_size is None or self.window_size > ts.shape[0] / 2: + self.logger.info('Window size is not defined or too big (> ts_length/2)') + self.window_size, _ = WindowSizeSelector(time_series=ts).get_window_size() + self.logger.info(f'Window size was set to {self.window_size}') + + +class VarianceSelector: + """ + Class that accepts a dictionary as input, the keys of which are the names of models and the values are arrays + of data in the np.array format.The class implements an algorithm to determine the "best" set of features and the + best model in the dictionary. + """ + + def __init__(self, models): + """ + Initialize the class with the models dictionary. + """ + self.models = models + self.principal_components = {} + self.model_scores = {} + + def get_best_model(self, **model_hyperparams): + """ + Method to determine the "best" set of features and the best model in the dictionary. + As an estimation algorithm, use the Principal Component analysis method and the proportion of the explained variance. + If there are several best models, then a model with a smaller number of principal components and a + larger value of the explained variance is chosen. + """ + best_model = None + best_score = 0 + for model_name, model_data in self.models.items(): + pca = PCA() + pca.fit(model_data) + filtred_score = [x for x in pca.explained_variance_ratio_ if x > 0.05] + score = sum(filtred_score) + self.principal_components.update({model_name: pca.components_[:, :len(filtred_score)]}) + self.model_scores.update({model_name: (score, len(filtred_score))}) + if score > best_score: + best_score = score + best_model = model_name + return best_model + + def transform(self, + model_data, + principal_components): + if type(principal_components) == str: + principal_components = self.principal_components[principal_components] + projected = np.dot(model_data, principal_components) + return projected + + def select_discriminative_features(self, + model_data, + projected_data, + corellation_level: float = 0.8): + discriminative_feature = {} + for PCT in range(projected_data.shape[1]): + correlation_df = pd.DataFrame.corrwith(model_data, pd.Series(projected_data[:, PCT]), axis=0, drop=False) + discriminative_feature_list = [k for k, x in zip(correlation_df.index.values, correlation_df.values) if + abs(x) > corellation_level] + discriminative_feature.update({f'{PCT + 1} principal components': discriminative_feature_list}) + return discriminative_feature diff --git a/fedot_ind/core/operation/filtration/quantile_filtration.py b/fedot_ind/core/operation/filtration/quantile_filtration.py deleted file mode 100644 index 255935dd3..000000000 --- a/fedot_ind/core/operation/filtration/quantile_filtration.py +++ /dev/null @@ -1,10 +0,0 @@ -import numpy as np - - -def quantile_filter(input_data, predicted_data, threshold: float = 0.9, lp_norm: int = 1): - reconstruction_error = np.linalg.norm(input_data - predicted_data, lp_norm, axis=1) / np.linalg.norm( - input_data, lp_norm, axis=1) - quantile = np.quantile(reconstruction_error, threshold) - outlier_idx = [np.where(np.isclose(reconstruction_error, idx_outlier))[0][0] - for idx_outlier in reconstruction_error[reconstruction_error > quantile]] - return outlier_idx diff --git a/fedot_ind/core/operation/optimization/FeatureSpace.py b/fedot_ind/core/operation/optimization/FeatureSpace.py deleted file mode 100644 index 8c9d9dddb..000000000 --- a/fedot_ind/core/operation/optimization/FeatureSpace.py +++ /dev/null @@ -1,60 +0,0 @@ -import numpy as np -import pandas as pd -from sklearn.decomposition import PCA - - -class VarianceSelector: - """ - Class that accepts a dictionary as input, the keys of which are the names of models and the values are arrays - of data in the np.array format.The class implements an algorithm to determine the "best" set of features and the - best model in the dictionary. - """ - - def __init__(self, models): - """ - Initialize the class with the models dictionary. - """ - self.models = models - self.principal_components = {} - self.model_scores = {} - - def get_best_model(self, **model_hyperparams): - """ - Method to determine the "best" set of features and the best model in the dictionary. - As an estimation algorithm, use the Principal Component analysis method and the proportion of the explained variance. - If there are several best models, then a model with a smaller number of principal components and a - larger value of the explained variance is chosen. - """ - best_model = None - best_score = 0 - for model_name, model_data in self.models.items(): - pca = PCA() - pca.fit(model_data) - filtred_score = [x for x in pca.explained_variance_ratio_ if x > 0.05] - score = sum(filtred_score) - self.principal_components.update({model_name: pca.components_[:, :len(filtred_score)]}) - self.model_scores.update({model_name: (score, len(filtred_score))}) - if score > best_score: - best_score = score - best_model = model_name - return best_model - - def transform(self, - model_data, - principal_components): - if type(principal_components) == str: - principal_components = self.principal_components[principal_components] - projected = np.dot(model_data, principal_components) - return projected - - def select_discriminative_features(self, - model_data, - projected_data, - corellation_level: float = 0.8): - discriminative_feature = {} - for PCT in range(projected_data.shape[1]): - correlation_df = pd.DataFrame.corrwith(model_data, pd.Series(projected_data[:, PCT]), axis=0, drop=False) - discriminative_feature_list = [k for k, x in zip(correlation_df.index.values, correlation_df.values) if - abs(x) > corellation_level] - discriminative_feature.update({f'{PCT + 1} principal components': discriminative_feature_list}) - return discriminative_feature diff --git a/fedot_ind/core/operation/transformation/FeatureSpaceReducer.py b/fedot_ind/core/operation/transformation/FeatureSpaceReducer.py deleted file mode 100644 index 68ef6fd92..000000000 --- a/fedot_ind/core/operation/transformation/FeatureSpaceReducer.py +++ /dev/null @@ -1,71 +0,0 @@ -import numpy as np -import pandas as pd -from sklearn.feature_selection import VarianceThreshold - -from fedot_ind.core.operation.transformation.window_selector import WindowSizeSelector - - -class FeatureSpaceReducer: - - def reduce_feature_space(self, features: pd.DataFrame, - var_threshold: float = 0.01, - corr_threshold: float = 0.98) -> pd.DataFrame: - """Method responsible for reducing feature space. - - Args: - features: dataframe with extracted features. - corr_threshold: cut-off value for correlation threshold. - var_threshold: cut-off value for variance threshold. - - Returns: - Dataframe with reduced feature space. - - """ - init_feature_space_size = features.shape[1] - - features = self._drop_stable_features(features, var_threshold) - features_new = self._drop_correlated_features(corr_threshold, features) - - final_feature_space_size = features_new.shape[1] - - if init_feature_space_size != final_feature_space_size: - self.logger.info(f'Feature space reduced from {init_feature_space_size} to {final_feature_space_size}') - - return features_new - - def _drop_correlated_features(self, corr_threshold, features): - features_corr = features.corr(method='pearson') - mask = np.ones(features_corr.columns.size) - np.eye(features_corr.columns.size) - df_corr = mask * features_corr - drops = [] - for col in df_corr.columns.values: - # continue if the feature is already in the drop list - if np.in1d([col], drops): - continue - - index_of_corr_feature = df_corr[abs(df_corr[col]) > corr_threshold].index - drops = np.union1d(drops, index_of_corr_feature) - - if len(drops) == 0: - self.logger.info('No correlated features found') - return features - - features_new = features.copy() - features_new.drop(drops, axis=1, inplace=True) - return features_new - - def _drop_stable_features(self, features, var_threshold): - try: - variance_reducer = VarianceThreshold(threshold=var_threshold) - variance_reducer.fit_transform(features) - unstable_features_mask = variance_reducer.get_support() - features = features.loc[:, unstable_features_mask] - except ValueError: - self.logger.info('Variance reducer has not found any features with low variance') - return features - - def validate_window_size(self, ts: np.ndarray): - if self.window_size is None or self.window_size > ts.shape[0] / 2: - self.logger.info('Window size is not defined or too big (> ts_length/2)') - self.window_size, _ = WindowSizeSelector(time_series=ts).get_window_size() - self.logger.info(f'Window size was set to {self.window_size}')