diff --git a/fedot_ind/core/operation/decomposition/matrix_decomposition/dmd_decomposition.py b/fedot_ind/core/operation/decomposition/matrix_decomposition/dmd_decomposition.py index ace5c4b7a..ea16b7429 100644 --- a/fedot_ind/core/operation/decomposition/matrix_decomposition/dmd_decomposition.py +++ b/fedot_ind/core/operation/decomposition/matrix_decomposition/dmd_decomposition.py @@ -78,7 +78,7 @@ def symmetric_decompose(X, Y, rank): Yf = np.zeros((rank, rank)) for i in range(r): Yf[i, i] = np.real(C1[i, i]) / S[i, i] - for j in range(i + 1, r): + for j in range(i + 1, rank): Yf[i, j] = (S[i, i] * np.conj(C1[j, i]) + S[j, j] * C1[i, j]) / (S[i, i] ** 2 + S[j, j] ** 2) Yf = Yf + Yf.T - np.diag(np.diag(np.real(Yf))) # elif method == 'skewsymmetric': diff --git a/fedot_ind/tools/synthetic/anomaly_generator.py b/fedot_ind/tools/synthetic/anomaly_generator.py index 17b33e262..5d8d90077 100644 --- a/fedot_ind/tools/synthetic/anomaly_generator.py +++ b/fedot_ind/tools/synthetic/anomaly_generator.py @@ -138,8 +138,8 @@ def plot_anomalies(self, initial_ts, modified_ts, anomaly_intervals_dict): label=cls) for cls in anomaly_intervals_dict.keys()] for anomaly_class, intervals in anomaly_intervals_dict.items(): - for interval in intervals.transform_for_fit(', '): - start_idx, end_idx = map(int, interval.transform_for_fit(':')) + for interval in intervals: + start_idx, end_idx = interval ax.axvspan(start_idx, end_idx, alpha=0.3, color=color_dict[anomaly_class]) # Put a legend to the right of the current axis diff --git a/fedot_ind/tools/synthetic/ts_generator.py b/fedot_ind/tools/synthetic/ts_generator.py index 37709902d..f42734621 100644 --- a/fedot_ind/tools/synthetic/ts_generator.py +++ b/fedot_ind/tools/synthetic/ts_generator.py @@ -32,7 +32,7 @@ def __init__(self, params: dict): self.ts_types = {'sin': SinWave, 'random_walk': RandomWalk, 'auto_regression': AutoRegression, - 'smooth_normal': SmoothNormal} + 'smooth_normal': SmoothNormal} self.params = params def __define_seed(self): diff --git a/tests/unit/core/operation/decomposition/test_physic_dmd.py b/tests/unit/core/operation/decomposition/test_physic_dmd.py new file mode 100644 index 000000000..76ecbea84 --- /dev/null +++ b/tests/unit/core/operation/decomposition/test_physic_dmd.py @@ -0,0 +1,23 @@ +from typing import Callable + +import numpy as np +import pytest + +from fedot_ind.core.operation.optimization.dmd.physic_dmd import piDMD + + +@pytest.fixture +def feature_target(): + return np.random.rand(10, 10), np.random.rand(10, 10) + + +@pytest.mark.parametrize('method', ('exact', 'orthogonal')) +def test_fit_exact(feature_target, method): + decomposer = piDMD(method=method) + features, target = feature_target + + fitted_linear_operator, eigenvals, eigenvectors = decomposer.fit(train_features=features, + train_target=target) + for i in [eigenvals, eigenvectors]: + assert isinstance(i, np.ndarray) + assert isinstance(fitted_linear_operator, Callable) diff --git a/tests/unit/core/operation/optimization/test_feature_space.py b/tests/unit/core/operation/optimization/test_feature_space.py new file mode 100644 index 000000000..9312ac0ef --- /dev/null +++ b/tests/unit/core/operation/optimization/test_feature_space.py @@ -0,0 +1,36 @@ +import numpy as np +import pandas as pd +import pytest + +from fedot_ind.core.operation.optimization.FeatureSpace import VarianceSelector + + +@pytest.fixture +def model_data(): + return dict(quantile=np.random.rand(10, 10), + signal=np.random.rand(10, 10), + topological=np.random.rand(10, 10)) + + +def test_get_best_model(model_data): + selector = VarianceSelector(models=model_data) + best_model = selector.get_best_model() + assert isinstance(best_model, str) + + +def test_transform(model_data): + selector = VarianceSelector(models=model_data) + projected = selector.transform(model_data=model_data['quantile'], + principal_components=np.random.rand(10, 2)) + assert isinstance(projected, np.ndarray) + + +def test_select_discriminative_features(model_data): + selector = VarianceSelector(models=model_data) + projected = selector.transform(model_data=model_data['quantile'], + principal_components=np.random.rand(10, 2)) + + discriminative_feature = selector.select_discriminative_features(model_data=pd.DataFrame(model_data['quantile']), + projected_data=projected) + + assert isinstance(discriminative_feature, dict) diff --git a/tests/unit/core/operation/optimization/test_structure_optimization.py b/tests/unit/core/operation/optimization/test_structure_optimization.py new file mode 100644 index 000000000..2c53a4ba4 --- /dev/null +++ b/tests/unit/core/operation/optimization/test_structure_optimization.py @@ -0,0 +1,51 @@ +import pytest +import torch +from torch import nn +from torch.utils.data import DataLoader, Dataset + +from fedot_ind.core.architecture.experiment.nn_experimenter import ClassificationExperimenter, FitParameters +from fedot_ind.core.operation.optimization.structure_optimization import SVDOptimization, SFPOptimization + +NUM_SAMPLES = 100 +INPUT_SIZE = 10 +OUTPUT_SIZE = 5 +BATCH_SIZE = 32 + + +class DummyModel(nn.Module): + def __init__(self, input_size, output_size): + super(DummyModel, self).__init__() + self.linear = nn.Linear(input_size, output_size) + + def forward(self, x): + return self.linear(x) + + +class SimpleDataset(Dataset): + def __init__(self, num_samples, input_size, output_size): + self.inputs = torch.rand((num_samples, input_size)) + self.targets = torch.randint(0, output_size, (num_samples,)) + + def __len__(self): + return len(self.inputs) + + def __getitem__(self, index): + return self.inputs[index], self.targets[index] + + +@pytest.fixture +def dummy_data_loader(): + dataset = SimpleDataset(NUM_SAMPLES, INPUT_SIZE, OUTPUT_SIZE) + shuffle = True + return DataLoader(dataset, + batch_size=BATCH_SIZE, + shuffle=shuffle) + + +@pytest.fixture() +def solver(): + model = DummyModel(INPUT_SIZE, OUTPUT_SIZE) + experimenter = ClassificationExperimenter(model=model, + metric='accuracy', + device='cpu') + return experimenter diff --git a/tests/unit/core/operation/transformation/test_splitter.py b/tests/unit/core/operation/transformation/test_splitter.py index 9c143651c..9f1ef2cd5 100644 --- a/tests/unit/core/operation/transformation/test_splitter.py +++ b/tests/unit/core/operation/transformation/test_splitter.py @@ -1,6 +1,8 @@ +import warnings + import numpy as np import pytest -from matplotlib import pyplot as plt +from matplotlib import get_backend, pyplot as plt from fedot_ind.core.operation.transformation.splitter import TSTransformer @@ -68,11 +70,15 @@ def test_unique_strategy(frequent_splitter): @pytest.mark.parametrize('binarize, plot', ([True, False], [False, False], [True, True], [False, True])) def test_frequent_strategy(frequent_splitter, time_series, anomaly_dict, binarize, plot): + # switch to non-Gui, preventing plots being displayed + # suppress UserWarning that agg cannot show plots + curr_backend = get_backend() + plt.switch_backend("Agg") + warnings.filterwarnings("ignore", "Matplotlib is currently using agg") features, target = frequent_splitter._frequent_strategy(series=time_series, anomaly_dict=anomaly_dict, plot=plot, binarize=binarize) - plt.close("all") assert isinstance(features, np.ndarray) assert isinstance(target, np.ndarray) if binarize: diff --git a/tests/unit/tools/test_anomaly_generator.py b/tests/unit/tools/test_anomaly_generator.py new file mode 100644 index 000000000..cb574dc61 --- /dev/null +++ b/tests/unit/tools/test_anomaly_generator.py @@ -0,0 +1,64 @@ +import warnings + +from matplotlib import get_backend + +from fedot_ind.tools.synthetic.anomaly_generator import AnomalyGenerator +import pytest +import matplotlib.pyplot as plt + + +@pytest.fixture +def config(): + return {'dip': {'level': 20, + 'number': 5, + 'min_anomaly_length': 10, + 'max_anomaly_length': 20}, + 'peak': {'level': 2, + 'number': 5, + 'min_anomaly_length': 5, + 'max_anomaly_length': 10}, + 'decrease_dispersion': {'level': 70, + 'number': 2, + 'min_anomaly_length': 10, + 'max_anomaly_length': 15}, + 'increase_dispersion': {'level': 50, + 'number': 2, + 'min_anomaly_length': 10, + 'max_anomaly_length': 15}, + 'shift_trend_up': {'level': 10, + 'number': 2, + 'min_anomaly_length': 10, + 'max_anomaly_length': 20}, + 'add_noise': {'level': 80, + 'number': 2, + 'noise_type': 'uniform', + 'min_anomaly_length': 10, + 'max_anomaly_length': 20} + } + + +@pytest.fixture +def synthetic_ts(): + return {'ts_type': 'sin', + 'length': 1000, + 'amplitude': 10, + 'period': 500} + + +def test_generate(config, synthetic_ts): + # switch to non-Gui, preventing plots being displayed + # suppress UserWarning that agg cannot show plots + curr_backend = get_backend() + plt.switch_backend("Agg") + warnings.filterwarnings("ignore", "Matplotlib is currently using agg") + + generator = AnomalyGenerator(config=config) + init_synth_ts, mod_synth_ts, synth_inters = generator.generate(time_series_data=synthetic_ts, + plot=True, + overlap=0.1) + + assert len(init_synth_ts) == len(mod_synth_ts) + for anomaly_type in synth_inters: + for interval in synth_inters[anomaly_type]: + ts_range = range(len(init_synth_ts)) + assert interval[0] in ts_range and interval[1] in ts_range diff --git a/tests/unit/tools/test_ts_datasets_generator.py b/tests/unit/tools/test_ts_datasets_generator.py new file mode 100644 index 000000000..0c599902a --- /dev/null +++ b/tests/unit/tools/test_ts_datasets_generator.py @@ -0,0 +1,12 @@ +from fedot_ind.tools.synthetic.ts_datasets_generator import TimeSeriesDatasetsGenerator + + +def test_generate_data(): + generator = TimeSeriesDatasetsGenerator(num_samples=80, + max_ts_len=50, + n_classes=3, + test_size=0.5) + (X_train, y_train), (X_test, y_test) = generator.generate_data() + + assert X_train.shape[0] == X_test.shape[0] + assert X_train.shape[1] == X_test.shape[1] diff --git a/tests/unit/tools/test_ts_generator.py b/tests/unit/tools/test_ts_generator.py new file mode 100644 index 000000000..9c93ea49c --- /dev/null +++ b/tests/unit/tools/test_ts_generator.py @@ -0,0 +1,33 @@ +import numpy as np +import pytest + +from fedot_ind.tools.synthetic.ts_generator import TimeSeriesGenerator + + +@pytest.fixture +def config(): + return dict(random_walk={'ts_type': 'random_walk', + 'length': 1000, + 'start_val': 36.6}, + sin={'ts_type': 'sin', + 'length': 1000, + 'amplitude': 10, + 'period': 500}, + auto_regression={'ts_type': 'auto_regression', + 'length': 1000, + 'ar_params': [0.5, -0.3, 0.2], + 'initial_values': None}, + smooth_normal={'ts_type': 'smooth_normal', + 'length': 1000, + 'window_size': 300} + + ) + + +@pytest.mark.parametrize('kind', ('random_walk', 'sin', 'auto_regression', 'smooth_normal')) +def test_get_ts(config, kind): + specific_config = config[kind] + generator = TimeSeriesGenerator(params=specific_config) + ts = generator.get_ts() + assert isinstance(ts, np.ndarray) + assert len(ts) == specific_config['length']