Skip to content

Commit

Permalink
rework search space for wavelet_basis, rework dask multithread, add b…
Browse files Browse the repository at this point in the history
…asic variance feature filtration to extractors
  • Loading branch information
v1docq committed Oct 22, 2024
1 parent ec69b4d commit c85d28d
Show file tree
Hide file tree
Showing 13 changed files with 842 additions and 1,190 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

99 changes: 99 additions & 0 deletions examples/tutorial/time_series/ts_classification/tmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from hyperopt import hp

from fedot_ind.core.architecture.pipelines.abstract_pipeline import AbstractPipeline, ApiTemplate


def plot_mean_sample(X, y, labels: list = [], n_channel: int = 1):
mean_sample = []
if len(labels) == 0:
labels = list(np.unique(y))
for label in labels:
mean_sample.append(np.mean(X[y == label], axis=0)) # Данные класса 1
# ax = plt.gca()
channels = [f'Channel {x}' for x in range(n_channel)]
df = pd.DataFrame(mean_sample).T
df.columns = labels
df.plot(kind='line', subplots=True, layout=(1, len(labels)), figsize=(20, 10))
plt.legend(fontsize='small')
plt.legend(loc='upper left', bbox_to_anchor=(1, 1))
plt.show()


# %%
def plot_mean_sample_multi(X, y, labels: list = [], n_channel: int = None):
mean_sample = {}
if len(labels) == 0:
labels = list(np.unique(y))
if n_channel is None:
n_channel = X.shape[1]
channels = [f'Channel {x}' for x in range(n_channel)]
for label in labels:
mask = y == label
for chn in range(n_channel):
mean_sample.update(
{f'Label_{label}_channel_{chn}': np.mean(X[mask.flatten(), chn, :], axis=0)}) # Данные класса 1
# ax = plt.gca()
df = pd.DataFrame(mean_sample)
df.plot(kind='line')
plt.suptitle('Усреднённые семплы по классам')
plt.legend(fontsize='small')
plt.legend(loc='upper left', bbox_to_anchor=(1, 1))
plt.show()


# %% md
### Topo Hyperparams
# %%
topological_params = {'window_size': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(5, 50, 5)]]},
'stride': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(1, 10, 1)]]}},
# %%
stat_params = {'window_size': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(5, 50, 5)]]},
'stride': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(1, 10, 1)]]},
'add_global_features': {'hyperopt-dist': hp.choice, 'sampling-scope': [[True, False]]}}
# %%
recurrence_params = {'window_size': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(5, 50, 5)]]},
'stride': {'hyperopt-dist': hp.choice, 'sampling-scope': [[x for x in range(1, 10, 1)]]},
'rec_metric': (hp.choice, [['cosine', 'euclidean']]),
'image_mode': {'hyperopt-dist': hp.choice, 'sampling-scope': [[True, False]]}},
# %%
rec_metric = 'cosine'
image_mode = True
window_size = 10
stride = 1
# %%
topological_node_dict = {'topological_extractor': {'window_size': window_size,
'stride': stride}}
# %%
recurrence_node_dict = {'recurrence_extractor': {'window_size': window_size,
'stride': stride,
'rec_metric': rec_metric,
'image_mode': image_mode}}

finetune = False
metric_names = ('f1', 'accuracy', 'precision', 'roc_auc')
api_config = dict(problem='classification',
metric='accuracy',
timeout=15,
pop_size=20,
with_tunig=False,
n_jobs=-1,
logging_level=10)
pipeline_creator = AbstractPipeline(task='classification')
ECG = 'ECG200'
topological_model = ['topological_extractor', 'rf']
recurrence_model = ['recurrence_extractor', 'quantile_extractor', 'rf']
# %%
ecg_dataset = pipeline_creator.create_input_data(ECG)

if __name__ == "__main__":
topo_list_model = {
'topological_extractor': {'window_size': 10},
'logit': {}}
result_dict_topo = ApiTemplate(api_config=api_config,
metric_list=metric_names).eval(dataset=ECG,
finetune=finetune,
initial_assumption=topo_list_model)
_ = 1
5 changes: 4 additions & 1 deletion fedot_ind/core/architecture/preprocessing/data_convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,10 @@ def have_fit_method(self):

@property
def have_predict_method(self):
return dir(self.operation_example).__contains__('predict')
if hasattr(self.operation_example, 'predict'):
return True if callable(self.operation_example.predict) else False
else:
return False

@property
def have_predict_for_fit_method(self):
Expand Down
8 changes: 7 additions & 1 deletion fedot_ind/core/models/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fedot_ind.core.architecture.abstraction.decorators import convert_to_input_data
from fedot_ind.core.metrics.metrics_implementation import *
from fedot_ind.core.operation.IndustrialCachableOperation import IndustrialCachableOperationImplementation
from fedot_ind.core.operation.filtration.feature_filtration import FeatureSpaceReducer
from fedot_ind.core.operation.transformation.data.hankel import HankelMatrix
from fedot_ind.core.repository.constanst_repository import STAT_METHODS, STAT_METHODS_GLOBAL

Expand All @@ -32,14 +33,15 @@ def __init__(self, params: Optional[OperationParameters] = None):
self.relevant_features = None
self.logger = logging.getLogger(self.__class__.__name__)
self.logging_params = {'jobs': self.n_processes}
self.feature_filter = FeatureSpaceReducer()
self.predict = None

def fit(self, input_data: InputData):
pass

def extract_features(self, x, y) -> pd.DataFrame:
"""
For those cases when you need to use feature extractor as a stangalone object
For those cases when you need to use feature extractor as a standalone object
"""
input_data = init_input_data(x, y)
transformed_features = self.transform(input_data, use_cache=self.use_cache)
Expand All @@ -65,6 +67,10 @@ def _transform(self, input_data: InputData) -> np.array:
self.predict = self._clean_predict(stacked_data)
self.predict = self.predict.reshape(self.predict.shape[0], -1)

if not self.feature_filter.is_fitted:
self.predict = self.feature_filter.reduce_feature_space(self.predict)
else:
self.predict = self.predict[:, :, self.feature_filter.feature_mask]
self.relevant_features = feature_matrix[0].supplementary_data['feature_name']
return self.predict

Expand Down
53 changes: 21 additions & 32 deletions fedot_ind/core/operation/filtration/feature_filtration.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _compute_component_corr(self, sample):
component_idx = np.where(
correlation_level == cor_level)[0][0] + 1
grouped_v = grouped_v + \
sample[component_idx, :]
sample[component_idx, :]
if component_idx in component_idx_list:
component_idx_list.remove(
component_idx)
Expand Down Expand Up @@ -121,8 +121,11 @@ def filter_signal(self, data):


class FeatureSpaceReducer:
def __init__(self):
self.is_fitted = False
self.feature_mask = None

def reduce_feature_space(self, features: pd.DataFrame,
def reduce_feature_space(self, features: np.array,
var_threshold: float = 0.01,
corr_threshold: float = 0.98) -> pd.DataFrame:
"""Method responsible for reducing feature space.
Expand All @@ -136,45 +139,31 @@ def reduce_feature_space(self, features: pd.DataFrame,
Dataframe with reduced feature space.
"""
features.shape[1]

features = self._drop_stable_features(features, var_threshold)
features, self.feature_mask = self._drop_constant_features(features, var_threshold)
features_new = self._drop_correlated_features(corr_threshold, features)
self.is_fitted = True
return features_new

def _drop_correlated_features(self, corr_threshold, features):
features_corr = features.corr(method='pearson')
mask = np.ones(features_corr.columns.size) - \
np.eye(features_corr.columns.size)
df_corr = mask * features_corr
drops = []
for col in df_corr.columns.values:
# continue if the feature is already in the drop list
if np.in1d([col], drops):
continue

index_of_corr_feature = df_corr[abs(
df_corr[col]) > corr_threshold].index
drops = np.union1d(drops, index_of_corr_feature)

if len(drops) == 0:
self.logger.info('No correlated features found')
return features

features_new = features.copy()
features_new.drop(drops, axis=1, inplace=True)
return features_new

def _drop_stable_features(self, features, var_threshold):
features_corr = np.corrcoef(features.squeeze().T)
n_features = features_corr.shape[0]
identity_matrix = np.eye(n_features)
features_corr = features_corr - identity_matrix
correlation_mask = abs(features_corr) > corr_threshold
correlated_features = list(set(np.where(correlation_mask == True)[0]))
percent_of_filtred_feats = (1 - (n_features - len(correlated_features)) / n_features) * 100
return features if percent_of_filtred_feats > 50 else features

def _drop_constant_features(self, features, var_threshold):
try:
variance_reducer = VarianceThreshold(threshold=var_threshold)
variance_reducer.fit_transform(features)
variance_reducer.fit_transform(features.squeeze())
unstable_features_mask = variance_reducer.get_support()
features = features.loc[:, unstable_features_mask]
features = features[:, :, unstable_features_mask]
except ValueError:
self.logger.info(
'Variance reducer has not found any features with low variance')
return features
return features, unstable_features_mask

def validate_window_size(self, ts: np.ndarray):
if self.window_size is None or self.window_size > ts.shape[0] / 2:
Expand Down Expand Up @@ -242,7 +231,7 @@ def select_discriminative_features(self,
model_data, pd.Series(projected_data[:, PCT]), axis=0, drop=False)
discriminative_feature_list = [
k for k,
x in zip(
x in zip(
correlation_df.index.values,
correlation_df.values) if abs(x) > correlation_level]
discriminative_feature.update(
Expand Down
2 changes: 1 addition & 1 deletion fedot_ind/core/operation/transformation/basis/wavelet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def __init__(self, params: Optional[OperationParameters] = None):
self.n_components = params.get('n_components')
self.wavelet = params.get('wavelet')
self.use_low_freq = params.get('low_freq', False)
self.scales = params.get('scale', WAVELET_SCALES)
self.basis = None
self.discrete_wavelets = DISCRETE_WAVELETS
self.continuous_wavelets = CONTINUOUS_WAVELETS
self.scales = WAVELET_SCALES

def __repr__(self):
return 'WaveletBasisImplementation'
Expand Down
19 changes: 11 additions & 8 deletions fedot_ind/core/optimizer/IndustrialEvoOptimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,7 @@ def __init__(self,
graph_generation_params: GraphGenerationParams,
graph_optimizer_params: GPAlgorithmParameters):

for mutation in graph_optimizer_params.mutation_types:
try:
is_invalid = mutation.__name__.__contains__('resample')
except Exception:
is_invalid = mutation.name.__contains__('resample')
if is_invalid:
graph_optimizer_params.mutation_types.remove(mutation)

graph_optimizer_params = self._exclude_resample_from_mutations(graph_optimizer_params)
graph_optimizer_params.adaptive_mutation_type = RandomAgent(actions=graph_optimizer_params.mutation_types,
probs=FEDOT_MUTATION_STRATEGY[
'params_mutation_strategy'])
Expand All @@ -52,6 +45,16 @@ def _create_initial_population(self, initial_assumption):
for graph in initial_assumption]
return initial_individuals

def _exclude_resample_from_mutations(self, graph_optimizer_params):
for mutation in graph_optimizer_params.mutation_types:
try:
is_invalid = mutation.__name__.__contains__('resample')
except Exception:
is_invalid = mutation.name.__contains__('resample')
if is_invalid:
graph_optimizer_params.mutation_types.remove(mutation)
return graph_optimizer_params

def _initial_population(self, evaluator: EvaluationOperator):
""" Initializes the initial population """
# Adding of initial assumptions to history as zero generation
Expand Down
54 changes: 27 additions & 27 deletions fedot_ind/core/repository/IndustrialDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from typing import Optional, Tuple

import dask
from golem.core.log import Log
from golem.core.optimisers.genetic.evaluation import MultiprocessingDispatcher
from golem.core.optimisers.genetic.operators.operator import EvaluationOperator, PopulationT
Expand All @@ -13,7 +14,7 @@
from golem.core.optimisers.timer import Timer
from golem.utilities.memory import MemoryAnalytics
from golem.utilities.utilities import determine_n_jobs
from joblib import wrap_non_picklable_objects, parallel_backend
from joblib import wrap_non_picklable_objects
from pymonad.either import Either
from pymonad.maybe import Maybe

Expand All @@ -30,17 +31,14 @@ def dispatch(self, objective: ObjectiveFunction,
return self.evaluate_with_cache

def _multithread_eval(self, individuals_to_evaluate):
with parallel_backend(backend='dask',
n_jobs=self.n_jobs,
scatter=[individuals_to_evaluate]
):
log = Log().get_parameters()
evaluation_results = list(map(lambda ind:
self.industrial_evaluate_single(self,
graph=ind.graph,
uid_of_individual=ind.uid,
logs_initializer=log),
individuals_to_evaluate))
log = Log().get_parameters()
evaluation_results = list(map(lambda ind:
self.industrial_evaluate_single(self,
graph=ind.graph,
uid_of_individual=ind.uid,
logs_initializer=log),
individuals_to_evaluate))
evaluation_results = dask.compute(*evaluation_results)
return evaluation_results

def _eval_at_least_one(self, individuals):
Expand Down Expand Up @@ -80,7 +78,22 @@ def evaluate_population(self, individuals: PopulationT) -> PopulationT:
logging_level=logging.INFO)
return successful_evals

# @delayed
@dask.delayed
def eval_ind(self, graph, uid_of_individual):
adapted_evaluate = self._adapter.adapt_func(self._evaluate_graph)
start_time = timeit.default_timer()
fitness, graph = adapted_evaluate(graph)
end_time = timeit.default_timer()
eval_time_iso = datetime.now().isoformat()
eval_res = GraphEvalResult(
uid_of_individual=uid_of_individual,
fitness=fitness,
graph=graph,
metadata={
'computation_time_in_seconds': end_time - start_time,
'evaluation_time_iso': eval_time_iso})
return eval_res

@wrap_non_picklable_objects
def industrial_evaluate_single(self,
graph: OptGraph,
Expand All @@ -100,17 +113,4 @@ def industrial_evaluate_single(self,
# in case of multiprocessing run
Log.setup_in_mp(*logs_initializer)

adapted_evaluate = self._adapter.adapt_func(self._evaluate_graph)
start_time = timeit.default_timer()
fitness, graph = adapted_evaluate(graph)
end_time = timeit.default_timer()
eval_time_iso = datetime.now().isoformat()

eval_res = GraphEvalResult(
uid_of_individual=uid_of_individual,
fitness=fitness,
graph=graph,
metadata={
'computation_time_in_seconds': end_time - start_time,
'evaluation_time_iso': eval_time_iso})
return eval_res
return self.eval_ind(graph, uid_of_individual)
5 changes: 3 additions & 2 deletions fedot_ind/core/repository/constanst_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,10 @@ class FedotOperationConstant(Enum):
'classification': PipelineBuilder().add_node('logit'),
'regression': PipelineBuilder().add_node('treg')
}

# mutation order - [param_change,model_change,add_preproc_model,drop_model,add_model]
FEDOT_MUTATION_STRATEGY = {
'params_mutation_strategy': [0.4, 0.2, 0.2, 0.1, 0.1],
# 'params_mutation_strategy': [0.6, 0.25, 0.05, 0.05, 0.05],
'params_mutation_strategy': [0.7, 0.3, 0.00, 0.00, 0.0],
'growth_mutation_strategy': [0.15, 0.15, 0.3, 0.1, 0.3],
'regularization_mutation_strategy': [0.2, 0.3, 0.1, 0.3, 0.1],
}
Expand Down
Loading

0 comments on commit c85d28d

Please sign in to comment.