From 169e989f9e4d617fa4cd449499a2f35536d3296a Mon Sep 17 00:00:00 2001 From: v1docq Date: Thu, 2 May 2024 13:32:45 +0300 Subject: [PATCH] release commit --- .../ts_forecasting/ts_forecasting_example.py | 4 +- fedot_ind/api/utils/checkers_collections.py | 2 +- fedot_ind/api/utils/industrial_strategy.py | 15 ++- .../preprocessing/data_convertor.py | 7 +- fedot_ind/core/models/ts_forecasting/glm.py | 116 ++++++++++++++++++ .../interfaces/industrial_model_strategy.py | 28 ----- .../industrial_preprocessing_strategy.py | 28 +++-- .../core/repository/constanst_repository.py | 6 +- .../data/default_operation_params.json | 3 +- .../industrial_implementations/abstract.py | 26 +++- .../optimisation.py | 5 - .../initializer_industrial_models.py | 4 +- fedot_ind/core/repository/model_repository.py | 3 +- fedot_ind/core/tuning/search_space.py | 7 +- fedot_ind/tools/example_utils.py | 2 +- 15 files changed, 190 insertions(+), 66 deletions(-) create mode 100644 fedot_ind/core/models/ts_forecasting/glm.py diff --git a/examples/automl_example/api_example/time_series/ts_forecasting/ts_forecasting_example.py b/examples/automl_example/api_example/time_series/ts_forecasting/ts_forecasting_example.py index a8f86968d..351d1254a 100644 --- a/examples/automl_example/api_example/time_series/ts_forecasting/ts_forecasting_example.py +++ b/examples/automl_example/api_example/time_series/ts_forecasting/ts_forecasting_example.py @@ -29,10 +29,10 @@ pop_size=10, industrial_strategy='forecasting_assumptions', n_jobs=2, - logging_level=30) + logging_level=40) for dataset_name in M4_FORECASTING_BENCH: - if dataset_name in industrial_loss and not dataset_name.__contains__('D'): + if dataset_name in industrial_loss and dataset_name.__contains__('W'):# print('Already evaluated, but with bad metrics') horizon = M4_FORECASTING_LENGTH[dataset_name[0]] api_config.update(task_params={'forecast_length': horizon}) diff --git a/fedot_ind/api/utils/checkers_collections.py b/fedot_ind/api/utils/checkers_collections.py index 7a039ee2b..58069f799 100644 --- a/fedot_ind/api/utils/checkers_collections.py +++ b/fedot_ind/api/utils/checkers_collections.py @@ -102,7 +102,7 @@ def _init_input_data(self) -> None: TsForecastingParams(forecast_length=self.task_params['forecast_length'])) if self.industrial_task_params is None: features_array = features_array[:-self.task_params['forecast_length']] - target = features_array[-self.task_params['forecast_length']:] + target = features_array self.input_data = InputData.from_numpy_time_series( features_array=features_array, target_array=target, diff --git a/fedot_ind/api/utils/industrial_strategy.py b/fedot_ind/api/utils/industrial_strategy.py index 76366fb0f..89d3b520e 100644 --- a/fedot_ind/api/utils/industrial_strategy.py +++ b/fedot_ind/api/utils/industrial_strategy.py @@ -81,10 +81,13 @@ def _forecasting_strategy(self, input_data): self.config_dict['timeout'] = round(self.config_dict['timeout'] / 3) self.solver = {} for model_name, init_assumption in FEDOT_TS_FORECASTING_ASSUMPTIONS.items(): - self.config_dict['initial_assumption'] = init_assumption.build() - industrial = Fedot(**self.config_dict) - industrial.fit(input_data) - self.solver.update({model_name: industrial}) + try: + self.config_dict['initial_assumption'] = init_assumption.build() + industrial = Fedot(**self.config_dict) + industrial.fit(input_data) + self.solver.update({model_name: industrial}) + except Exception: + self.logger.info(f'Failed during fit stage - {model_name}') def _forecasting_exogenous_strategy(self, input_data): self.logger.info('TS exogenous forecasting algorithm was applied') @@ -167,8 +170,8 @@ def _federated_predict(self, def _forecasting_predict(self, input_data, - mode: str = 'labels'): - labels_dict = {k: v.predict(input_data, mode) for k, v in self.solver.items()} + mode: str = True): + labels_dict = {k: v.predict(features=input_data, in_sample=mode) for k, v in self.solver.items()} return labels_dict def _kernel_predict(self, diff --git a/fedot_ind/core/architecture/preprocessing/data_convertor.py b/fedot_ind/core/architecture/preprocessing/data_convertor.py index f3fb47ee2..e4a853321 100644 --- a/fedot_ind/core/architecture/preprocessing/data_convertor.py +++ b/fedot_ind/core/architecture/preprocessing/data_convertor.py @@ -123,9 +123,9 @@ def convert_to_output_data(self, prediction, predict_data, output_data_type): - if type(prediction) is OutputData: + if isinstance(prediction, OutputData): output_data = prediction - elif type(prediction) is list: + elif isinstance(prediction, list): output_data = prediction[0] target = NumpyConverter( data=np.concatenate([p.target for p in prediction], axis=0)).convert_to_torch_format() @@ -167,6 +167,9 @@ def convert_to_industrial_composing_format(self, mode): array.reshape(array.shape[0], array.shape[1] * array.shape[2]) if array is not None and len(array.shape) > 2 else array for array in [self.input_data.features, self.input_data.target]] + # if new_features.shape[0] != new_target.shape[0]: + # min_samples = min(new_features.shape[0], new_target.shape[0]) + # new_features, new_target = new_features[:min_samples], new_target[:min_samples] input_data = InputData(idx=self.input_data.idx, features=new_features, target=new_target, diff --git a/fedot_ind/core/models/ts_forecasting/glm.py b/fedot_ind/core/models/ts_forecasting/glm.py new file mode 100644 index 000000000..748ae1946 --- /dev/null +++ b/fedot_ind/core/models/ts_forecasting/glm.py @@ -0,0 +1,116 @@ +from copy import copy +import numpy as np +import statsmodels.api as sm +from fedot.core.pipelines.pipeline_builder import PipelineBuilder +from fedot.core.repository.metrics_repository import RegressionMetricsEnum +from golem.core.tuning.optuna_tuner import OptunaTuner +from scipy.stats import kurtosis, skew +from statsmodels.genmod.families import Gamma, Gaussian, InverseGaussian +from statsmodels.genmod.families.links import identity, inverse_power, inverse_squared, log as lg +from statsmodels.genmod.generalized_linear_model import GLM + +from fedot.core.data.data import InputData, OutputData +from fedot.core.operations.evaluation.operation_implementations.data_operations.ts_transformations import ts_to_table +from fedot.core.operations.evaluation.operation_implementations.implementation_interfaces import ModelImplementation +from fedot.core.operations.operation_parameters import OperationParameters +from fedot.core.repository.dataset_types import DataTypesEnum + +from fedot_ind.core.repository.industrial_implementations.abstract import build_tuner + + +class GLMIndustrial(ModelImplementation): + """ Generalized linear models implementation """ + family_distribution = { + "gaussian": Gaussian(lg()), + "gamma": Gamma(lg()), + "inverse_gaussian": InverseGaussian(inverse_squared()) + } + + def __init__(self, params: OperationParameters): + super().__init__(params) + self.model = None + self.family_link = None + self.auto_reg = PipelineBuilder().add_node('ar').build() + self.ar_tuning_params = dict(tuner=OptunaTuner, metric=RegressionMetricsEnum.RMSE, tuning_timeout=3, + tuning_early_stop=20, tuning_iterations=50) + + @property + def family(self) -> str: + return self.params.get('family') + + @property + def link(self) -> str: + return self.params.get('link') + + def _check_glm_params(self, mean_kurtosis, mean_skew): + if np.logical_or(mean_kurtosis < -1, mean_kurtosis > 1) and np.logical_or(mean_skew < -0.2, mean_skew > 0.2): + family = 'gamma' + elif np.logical_or(mean_kurtosis < -2, mean_kurtosis > 2) and np.logical_or(mean_skew < -0.5, mean_skew > 0.5): + family = "inverse_gaussian" + else: + family = 'gaussian' + return family + + def fit(self, input_data): + pipeline_tuner, tuned_model = build_tuner(self, self.auto_reg, self.ar_tuning_params, input_data, 'head') + self.auto_reg = tuned_model + residual = self.auto_reg.root_node.fitted_operation[0].autoreg.resid + residual = np.nan_to_num(residual, nan=0, posinf=0, neginf=0) + family = self._check_glm_params(kurtosis(residual), skew(residual)) + self.family_link = self.family_distribution[family] + self.exog_residual = sm.add_constant(np.arange(0, residual.shape[0]).astype("float64")).reshape(-1, 2) + self.model = GLM( + exog=self.exog_residual, + endog=residual.astype("float64").reshape(-1, 1), + family=self.family_link + ).fit(method="lbfgs") + return self.model + + def predict(self, input_data): + autoreg_predict = self.auto_reg.predict(input_data) + input_data = copy(input_data) + parameters = input_data.task.task_params + forecast_length = parameters.forecast_length + old_idx = input_data.idx + if forecast_length == 1: + predictions = self.model.predict(np.concatenate([np.array([1]), + input_data.idx.astype("float64")]).reshape(-1, 2)) + else: + predictions = self.model.predict(self.exog_residual) + predictions = predictions[-forecast_length:] + predict = autoreg_predict.predict + predictions + output_data = self._convert_to_output(input_data, + predict=predict, + data_type=DataTypesEnum.table) + return output_data + + def predict_for_fit(self, input_data: InputData) -> OutputData: + autoreg_predict = self.auto_reg.predict(input_data) + input_data = copy(input_data) + parameters = input_data.task.task_params + forecast_length = parameters.forecast_length + old_idx = input_data.idx + target = input_data.target + predictions = self.model.predict(self.exog_residual) + predictions = predictions[-forecast_length:] + predict = autoreg_predict.predict + predictions + _, predict = ts_to_table(idx=old_idx, + time_series=predictions, + window_size=forecast_length) + new_idx, target_columns = ts_to_table(idx=old_idx, + time_series=target, + window_size=forecast_length) + + input_data.idx = new_idx + input_data.target = target_columns + + output_data = self._convert_to_output(input_data, + predict=predict, + data_type=DataTypesEnum.table) + return output_data + + def set_default(self): + """ Set default value of Family(link) """ + self.family_link = self.family_distribution['default'] + self.params.update(family='gaussian') + self.log.info("Invalid family. Changed to default value") diff --git a/fedot_ind/core/operation/interfaces/industrial_model_strategy.py b/fedot_ind/core/operation/interfaces/industrial_model_strategy.py index 26b3bf48b..20ae82b18 100644 --- a/fedot_ind/core/operation/interfaces/industrial_model_strategy.py +++ b/fedot_ind/core/operation/interfaces/industrial_model_strategy.py @@ -128,36 +128,8 @@ def __init__(self, operation_type: str, params: Optional[OperationParameters] = self.multi_dim_dispatcher.concat_func = np.vstack self.ensemble_func = np.sum - def _check_glm_params(self, mean_kurtosis, mean_skew): - if mean_kurtosis < 1 and mean_skew > 0.5: - family = 'gamma' - link = 'log' - elif mean_kurtosis > 3 and mean_skew > 0.5: - family = "inverse_gaussian" - link = 'inverse_power' - else: - family = 'gaussian' - link = 'identity' - return family, link - - def _create_channel_params(self, train_data): - if self.operation_type == 'glm': - if isinstance(train_data, list): - self.multi_dim_dispatcher.params_for_fit = [] - for x in train_data: - family, link = self._check_glm_params(kurtosis(x.features), skew(x.features)) - self.multi_dim_dispatcher.params_for_fit.append({'family': family, - 'link': link}) - - else: - family, link = self._check_glm_params(kurtosis(train_data.features), skew(train_data.features)) - self.multi_dim_dispatcher.params_for_fit = {'family': family, - 'link': link} - return train_data - def fit(self, train_data: InputData): train_data = self.multi_dim_dispatcher._convert_input_data(train_data) - train_data = self._create_channel_params(train_data) return self.multi_dim_dispatcher.fit(train_data) def predict(self, trained_operation, predict_data: InputData, output_mode: str = 'labels') -> OutputData: diff --git a/fedot_ind/core/operation/interfaces/industrial_preprocessing_strategy.py b/fedot_ind/core/operation/interfaces/industrial_preprocessing_strategy.py index 261d27378..7dd1299e2 100644 --- a/fedot_ind/core/operation/interfaces/industrial_preprocessing_strategy.py +++ b/fedot_ind/core/operation/interfaces/industrial_preprocessing_strategy.py @@ -96,18 +96,22 @@ def _predict_for_ndim(self, predict_data, trained_operation: list): if self.operation_condition_for_channel_independent.is_transform_input_fedot: prediction = list(operation.transform( data) for operation, data in zip(trained_operation, test_data)) - prediction = [pred.predict if type( - pred) is not np.ndarray else pred for pred in prediction] + if self.operation_type == 'lagged' or self.operation_type == 'sparse_lagged': + prediction = prediction + else: + prediction = [pred.predict if type( + pred) is not np.ndarray else pred for pred in prediction] else: prediction = list( operation.transform(data.features) for operation, data in zip(trained_operation, test_data)) elif self.operation_condition_for_channel_independent.have_predict_method: prediction = list(operation.predict(data) for operation, data in zip(trained_operation, test_data)) - prediction = [pred.predict for pred in prediction if type( - pred) is not np.array] - prediction = NumpyConverter(data=self.concat_func(prediction)).convert_to_torch_format() + prediction = [pred.predict for pred in prediction if type(pred) is not np.array] + + if not isinstance(prediction[0], OutputData): + prediction = NumpyConverter(data=self.concat_func(prediction)).convert_to_torch_format() return prediction def _custom_fit(self, train_data): @@ -161,23 +165,23 @@ def fit(self, train_data: InputData): # Elif model could be use for each dimension(channel) independently we use channel_independent mode elif self.operation_condition.is_channel_independent_operation: # Create independent copy of model for each channel - if isinstance(operation_implementation, list): + if self.operation_condition.is_operation_is_list_container: trained_operation = operation_implementation else: trained_operation = [deepcopy(operation_implementation) if self.operation_condition.is_list_container else deepcopy(operation_implementation) for i in range(len(train_data))] - train_data = train_data if self.operation_condition.is_list_container else [ - train_data] + train_data = train_data if self.operation_condition.is_list_container else [train_data] # Check if model have both or just one method (fit and transform_for_fit). For some model one of this method # could be not finished to use right now. if self.operation_condition.have_fit_method: operation_implementation = [operation.fit(data) for operation, data in zip( trained_operation, train_data)] - fit_method_is_not_implemented = operation_implementation[0] is None + if not type(operation_implementation[0]) == type(trained_operation[0]): operation_implementation = trained_operation + fit_method_is_not_implemented = operation_implementation[0] is None elif self.operation_condition.have_transform_method: operation_implementation = [operation.transform_for_fit(data) for operation, data in zip( trained_operation, train_data)] @@ -210,8 +214,7 @@ def predict_for_fit(self, trained_operation, predict_data: InputData, output_mod predict_data=predict_data, output_mode=output_mode) elif self.operation_condition.is_channel_independent_operation: - prediction = self.__operation_multidim_adapter( - trained_operation, predict_data) + prediction = self.__operation_multidim_adapter(trained_operation, predict_data) elif self.operation_condition.is_multi_dimensional_operation: if self.operation_condition.have_predict_for_fit_method: prediction = trained_operation.predict_for_fit( @@ -220,8 +223,7 @@ def predict_for_fit(self, trained_operation, predict_data: InputData, output_mod prediction = trained_operation.predict( predict_data, output_mode) - converted = self._convert_to_output( - prediction, predict_data_copy, data_type, output_mode) + converted = self._convert_to_output(prediction, predict_data_copy, data_type, output_mode) return converted def predict(self, trained_operation, predict_data: InputData, output_mode: str = 'default') -> OutputData: diff --git a/fedot_ind/core/repository/constanst_repository.py b/fedot_ind/core/repository/constanst_repository.py index 845ce54fe..91d99085d 100644 --- a/fedot_ind/core/repository/constanst_repository.py +++ b/fedot_ind/core/repository/constanst_repository.py @@ -263,12 +263,12 @@ class FedotOperationConstant(Enum): 'ar')} FEDOT_TS_FORECASTING_ASSUMPTIONS = { - 'lagged_ridge': PipelineBuilder().add_node('lagged').add_node('ridge'), + # 'lagged_ridge': PipelineBuilder().add_node('lagged').add_node('ridge'), 'eigen_ar': PipelineBuilder().add_node('eigen_basis', params={'low_rank_approximation': False, 'rank_regularization': 'explained_dispersion'}).add_node('ar'), - 'topological_ridge': PipelineBuilder().add_node('topological_extractor').add_node('ridge') - # 'glm': PipelineBuilder().add_node('glm') + # 'topological_ridge': PipelineBuilder().add_node('lagged').add_node('topological_extractor').add_node('ridge'), + 'glm': PipelineBuilder().add_node('glm') } FEDOT_ENSEMBLE_ASSUMPTIONS = { diff --git a/fedot_ind/core/repository/data/default_operation_params.json b/fedot_ind/core/repository/data/default_operation_params.json index 28ae79b2a..1df469e80 100644 --- a/fedot_ind/core/repository/data/default_operation_params.json +++ b/fedot_ind/core/repository/data/default_operation_params.json @@ -66,7 +66,8 @@ }, "ar": { "lag_1": 7, - "lag_2": 12 + "lag_2": 12, + "trend": "c" }, "arima": { "p": 2, diff --git a/fedot_ind/core/repository/industrial_implementations/abstract.py b/fedot_ind/core/repository/industrial_implementations/abstract.py index 60323591b..950ea5913 100644 --- a/fedot_ind/core/repository/industrial_implementations/abstract.py +++ b/fedot_ind/core/repository/industrial_implementations/abstract.py @@ -23,7 +23,32 @@ from typing import Optional, Tuple, Union, Sequence, List, Dict from fedot.core.data.data import InputData, OutputData +def split_time_series(data: InputData, + validation_blocks: Optional[int] = None, + **kwargs): + """ Split time series data into train and test parts + :param data: InputData object to split + :param validation_blocks: validation blocks are used for test + """ + + forecast_length = data.task.task_params.forecast_length + if validation_blocks is not None: + forecast_length *= validation_blocks + + target_length = len(data.target) + train_data = _split_input_data_by_indexes(data, index=np.arange(0, target_length - forecast_length),) + test_data = _split_input_data_by_indexes(data, index=np.arange(target_length - forecast_length, target_length), + retain_first_target=True) + + if validation_blocks is None: + # for in-sample + test_data.features = train_data.features + else: + # for out-of-sample + test_data.features = data.features + + return train_data, test_data def split_any(data: InputData, split_ratio: float, shuffle: bool, @@ -191,7 +216,6 @@ def transform_lagged(self, input_data: InputData): # Correct window size parameter self._check_and_correct_window_size(train_data.features, forecast_length) window_size = self.window_size - window_size = forecast_length new_idx, transformed_cols, new_target = transform_features_and_target_into_lagged(train_data, forecast_length, window_size) diff --git a/fedot_ind/core/repository/industrial_implementations/optimisation.py b/fedot_ind/core/repository/industrial_implementations/optimisation.py index cad5fecff..8cdb72d4e 100644 --- a/fedot_ind/core/repository/industrial_implementations/optimisation.py +++ b/fedot_ind/core/repository/industrial_implementations/optimisation.py @@ -49,11 +49,6 @@ def __init__(self, task_type): self.node_adapter = PipelineAdapter() self.task_type = Task(task_type) self.excluded_mutation = EXCLUDED_OPERATION_MUTATION[self.task_type.task_type.value] - # self.industrial_data_operations = [*list(AtomizedModel.INDUSTRIAL_PREPROC_MODEL.value.keys()), - # *list(AtomizedModel.INDUSTRIAL_CLF_PREPROC_MODEL.value.keys()), - # *list(AtomizedModel.FEDOT_PREPROC_MODEL.value.keys()), - # *list(AtomizedModel.NEURAL_MODEL.value.keys()), - # *list(AtomizedModel.FORECASTING_MODELS.value.keys())] self.industrial_data_operations = default_industrial_availiable_operation(self.task_type.task_type.value) self.excluded = [list(TEMPORARY_EXCLUDED[x].keys()) for x in TEMPORARY_EXCLUDED.keys()] diff --git a/fedot_ind/core/repository/initializer_industrial_models.py b/fedot_ind/core/repository/initializer_industrial_models.py index 6243defdf..a33496a3d 100644 --- a/fedot_ind/core/repository/initializer_industrial_models.py +++ b/fedot_ind/core/repository/initializer_industrial_models.py @@ -17,7 +17,8 @@ from fedot_ind.api.utils.path_lib import PROJECT_PATH from fedot_ind.core.repository.industrial_implementations.abstract import merge_predicts, preprocess_predicts, \ predict_for_fit, predict, predict_operation, postprocess_predicts, update_column_types, transform_lagged, \ - transform_lagged_for_fit, transform_smoothing, _build, split_any, _check_and_correct_window_size, merge_targets + transform_lagged_for_fit, transform_smoothing, _build, split_any, _check_and_correct_window_size, merge_targets, \ + split_time_series from fedot_ind.core.repository.industrial_implementations.optimisation import _get_default_industrial_mutations, \ MutationStrengthEnumIndustrial, has_no_data_flow_conflicts_in_industrial_pipeline, _crossover_by_type from fedot_ind.core.tuning.search_space import get_industrial_search_space @@ -70,6 +71,7 @@ def setup_repository(self): ## replace data split setattr(DataSourceSplitter, "build", _build) setattr(fedot_data_split, "_split_any", split_any) + setattr(fedot_data_split, "_split_time_series", split_time_series) # setattr(TSDataMerger, 'postprocess_predicts', postprocess_predicts) ## replace predict operations setattr(Operation, "_predict", predict_operation) diff --git a/fedot_ind/core/repository/model_repository.py b/fedot_ind/core/repository/model_repository.py index 50952584a..d79b3611f 100644 --- a/fedot_ind/core/repository/model_repository.py +++ b/fedot_ind/core/repository/model_repository.py @@ -52,6 +52,7 @@ from fedot_ind.core.models.nn.network_impl.tst import TSTModel from fedot_ind.core.models.quantile.quantile_extractor import QuantileExtractor from fedot_ind.core.models.recurrence.reccurence_extractor import RecurrenceExtractor +from fedot_ind.core.models.ts_forecasting.glm import GLMIndustrial from fedot_ind.core.operation.dummy.dummy_operation import DummyOperation from fedot_ind.core.operation.filtration.channel_filtration import ChannelCentroidFilter from fedot_ind.core.operation.filtration.feature_filtration import FeatureFilter @@ -175,7 +176,7 @@ class AtomizedModel(Enum): 'stl_arima': STLForecastARIMAImplementation, 'ets': ExpSmoothingImplementation, 'cgru': CGRUImplementation, - 'glm': GLMImplementation + 'glm': GLMIndustrial } FORECASTING_PREPROC = { diff --git a/fedot_ind/core/tuning/search_space.py b/fedot_ind/core/tuning/search_space.py index 8352b27a8..f87a1e186 100644 --- a/fedot_ind/core/tuning/search_space.py +++ b/fedot_ind/core/tuning/search_space.py @@ -409,6 +409,11 @@ def get_industrial_search_space(self): 'hyperopt-dist': hp.uniform, 'sampling-scope': [2, 800], 'type': 'continuous'} + , + 'trend': { + 'hyperopt-dist': hp.choice, + 'sampling-scope': [['n', 'c', 't', 'ct']], + 'type': 'categorical'} }, 'ets': { 'error': { @@ -499,7 +504,7 @@ def get_industrial_search_space(self): 'sampling-scope': [['mae', 'mse']], 'type': 'categorical'}, }, - 'topological_features': { + 'topological_extractor': { 'window_size_as_share': { 'hyperopt-dist': hp.uniform, 'sampling-scope': [0.1, 0.9], diff --git a/fedot_ind/tools/example_utils.py b/fedot_ind/tools/example_utils.py index 03b372a08..1825133eb 100644 --- a/fedot_ind/tools/example_utils.py +++ b/fedot_ind/tools/example_utils.py @@ -65,7 +65,7 @@ def industrial_forecasting_modelling_loop(dataset_name: str = None, val = 1000000000 for forecat_model, predict in labels.items(): industrial.predicted_labels = predict - + best_model = forecat_model current_metric = industrial.get_metrics(target=target, metric_names=('smape', 'rmse', 'median_absolute_error')) current_rmse = current_metric['rmse'].values[0]