From 4a522004aafc66c66ea6ff33bd824000fd3b43ea Mon Sep 17 00:00:00 2001 From: leostre Date: Thu, 16 May 2024 19:18:02 +0300 Subject: [PATCH] added inverse scaling --- .../core/models/nn/network_impl/deepar.py | 79 ++++++------------- .../core/models/nn/network_modules/losses.py | 21 +++-- 2 files changed, 36 insertions(+), 64 deletions(-) diff --git a/fedot_ind/core/models/nn/network_impl/deepar.py b/fedot_ind/core/models/nn/network_impl/deepar.py index a68f3eeb9..9ac3922df 100644 --- a/fedot_ind/core/models/nn/network_impl/deepar.py +++ b/fedot_ind/core/models/nn/network_impl/deepar.py @@ -34,11 +34,15 @@ def __init__(self): def forward(self, x, normalize=True): if normalize: self.means = x.mean(dim=-1, keepdim=True) - self.factors = torch.sqrt(x.std(dim=-1, keepdim=True, + self.factors = torch.sqrt(x.std(dim=-1, keepdim=True, # True results in really strange behavior of affine transformer unbiased=False)) + self.eps return (x - self.means) / self.factors else: - return x * self.factors + self.means + factors, means = self.factors, self.means + if len(x.size()) == 4: + factors = factors[..., None] + means == factors[..., None] + return x * factors + means class DeepARModule(Module): _loss_fns = { @@ -56,11 +60,6 @@ def __init__(self, cell_type, input_size, hidden_size, rnn_layers, dropout, dist ) self.hidden_size = hidden_size self.scaler = _TSScaler() - # self.scaler = RevIN( - # affine=False, - # input_dim=input_size, - # dim=-1, # -1 in case series-wise normalization, 0 for batch-wise, RNN needs series_wise - # ) self.distribution = self._loss_fns[distribution] if distribution is not None: self.projector = Linear(self.hidden_size, len(self.distribution.distribution_arguments)) @@ -87,27 +86,25 @@ def _decode_whole_seq(self, ts: torch.Tensor, hidden_state: torch.Tensor): return output, hidden_state def forecast(self, prefix: torch.Tensor, horizon, mode='lagged', output_mode='quantiles', **mode_kw): - # assert prefix.size(-1) == self.rnn.input_size, f'Not enough information for forecasting! need {self.input_size}' self.eval() forecast = [] if mode == 'lagged': with torch.no_grad(): for i in range(horizon): output = self(prefix)[0] - forecast.append(self._transform_params(output, mode=output_mode, **mode_kw).detach().cpu()) - prediction = self._transform_params(output, mode='samples', n_samples=1) + forecast.append(self._transform_params(output, target_scale=self.target_scale, mode=output_mode, **mode_kw).detach().cpu()) + prediction = self._transform_params(output, target_scale=self.target_scale, + mode='predictions') prefix = torch.roll(prefix, -1, dims=-1) - prefix[..., -1] = prediction - forecast = torch.stack(forecast).squeeze(1).permute(1, 2, 0) + prefix[..., [-1]] = prediction + forecast = torch.stack(forecast)#.squeeze(1).permute(1, 2, 0) elif mode == 'auto': - pass + assert self.rnn.input_size == 1, "autoregressive mode requires the features not to be lagged" else: raise ValueError('Unknown forecasting type!') - return forecast - def forward(self, x: torch.Tensor, # n_samples: int = None, mode='raw', **mode_kw): @@ -124,33 +121,27 @@ def forward(self, x: torch.Tensor, assert mode == 'raw', "cannot use another mode, but 'raw' while training" return self._decode_whole_seq(x, hidden_state) else: - output = self._decode_whole_seq(x, hidden_state) - return self._transform_params(output, mode, **mode_kw) - - output = self._decode_autoregressive( - x, - hidden_state, n_samples=0, mode=mode - ) - return output, hidden_state + output, hidden_state = self._decode_whole_seq(x, hidden_state) + return self._transform_params(output, + mode=mode, **mode_kw), hidden_state def to_quantiles(self, params: torch.Tensor, quantiles=None): if quantiles is None: quantiles = self.quantiles distr = self.distribution.map_x_to_distribution(params) - return distr.icdf(quantiles) + return distr.icdf(quantiles).unsqueeze(1) def to_samples(self, params: torch.Tensor, n_samples=100): distr = self.distribution.map_x_to_distribution(params) - return distr.sample((n_samples,)).T # distr_n x n_samples + return distr.sample((n_samples,)).permute(1, 2, 0) # distr_n x n_samples def to_predictions(self, params: torch.Tensor): distr = self.distribution.map_x_to_distribution(params) - return distr.sample((1,)).T.squeeze() # distr_n x 1 + return distr.sample((1,)).permute(1, 2, 0) # distr_n x 1 def _transform_params(self, distr_params, mode='raw', **mode_kw): - # factors = if mode == 'raw': - transformed = distr_params + return distr_params elif mode == 'quantiles': transformed = self.to_quantiles(distr_params, **mode_kw) elif mode == 'predictions': @@ -159,32 +150,12 @@ def _transform_params(self, distr_params, mode='raw', **mode_kw): transformed = self.to_samples(distr_params, **mode_kw) else: raise ValueError('Unexpected forecast mode!') - # transformed = self.scaler(transformed, False) + transformed = self.scaler(transformed, False) + return transformed - def predict(self, test_x: torch.Tensor, mode=None): - self.eval() - distr_params, _ = self(test_x) - return self._transform_params(distr_params, mode) - - # def decode(self, x, hidden_state=None, n_samples=0, mode='raw'): - # if hidden_state is None: - # hidden_state = torch.zeros((self.hidden_size,)).float() - # # if not n_samples: - # # output, _ = self._decode_whole_seq(x, hidden_state) - # # output = self._transform_params(output, mode=mode) - # if True: - # # make predictions which are fed into next step - # output = self.decode_autoregressive( - # first_target=x[:, 0], - # first_hidden_state=hidden_state, - # # target_scale=target_scale, - # n_decoder_steps=x.size(1), - # n_samples=n_samples, - # ) - # return output - + def _decode_one(self, x, idx, hidden_state, @@ -268,6 +239,7 @@ def __init__(self, params: Optional[OperationParameters] = {}): def _init_model(self, ts) -> tuple: self.loss_fn = DeepARModule._loss_fns[self.expected_distribution]() input_size = self.patch_len or ts.features.shape[-1] + self.patch_len = input_size self.model = DeepARModule(input_size=input_size, hidden_size=self.hidden_size, cell_type=self.cell_type, @@ -298,6 +270,7 @@ def fit(self, input_data: InputData, split_data: bool = False): def _prepare_data(self, input_data: InputData, split_data): val_loader = None + # define patch_len if self.preprocess_to_lagged: self.patch_len = input_data.features.shape[-1] train_loader = self.__create_torch_loader(input_data) @@ -443,7 +416,7 @@ def _get_train_val_loaders(self, validation_blocks: int = None, unsqueeze_0=True): if patch_len is None: - patch_len = self.horizon + patch_len = self.patch_len train_data = self.__ts_to_input_data(ts) if split_data: raise NotImplementedError('Problem with lagged_data splitting') @@ -504,6 +477,7 @@ def _get_test_loader(self, test_data = self.__ts_to_input_data(test_data) if len(test_data.features.shape) == 1: test_data.features = test_data.features[None, ...] + if not self.preprocess_to_lagged: features = HankelMatrix(time_series=test_data.features, @@ -518,7 +492,6 @@ def _get_test_loader(self, convert_to_torch_format()).float() target = torch.from_numpy(DataConverter( data=features).convert_to_torch_format()).float() - test_loader = torch.utils.data.DataLoader(data.TensorDataset(features, target), batch_size=self.batch_size, shuffle=False) return test_loader diff --git a/fedot_ind/core/models/nn/network_modules/losses.py b/fedot_ind/core/models/nn/network_modules/losses.py index 7a8382510..96e4ce8f2 100644 --- a/fedot_ind/core/models/nn/network_modules/losses.py +++ b/fedot_ind/core/models/nn/network_modules/losses.py @@ -1,4 +1,4 @@ -from typing import Optional, Union, List +from typing import Optional, Union, List, Tuple import torch import torch.nn.functional as F @@ -268,7 +268,7 @@ def __init__( self.reduction = getattr(torch, reduction) if reduction else lambda x: x @classmethod - def map_x_to_distribution(cls, x: torch.Tensor, affine: List[torch.Tensor, torch.Tensor]=None) -> distributions.Distribution: + def map_x_to_distribution(cls, x: torch.Tensor) -> distributions.Distribution: """ Map the a tensor of parameters to a probability distribution. @@ -280,15 +280,14 @@ def map_x_to_distribution(cls, x: torch.Tensor, affine: List[torch.Tensor, torch class attribute ``distribution_class`` """ distr = cls._map_x_to_distribution(x) - if cls.need_affine or affine: - if affine: - loc = affine[0] - scale = affine[1] - else: - loc = x[..., 0] - scale = x[..., 1] - scaler = distributions.AffineTransform(loc=loc, scale=scale) - distr = distributions.TransformedDistribution(distr, [scaler]) + transforms = [] + if cls.need_affine: + loc = x[..., 0] + scale = x[..., 1] + scaler_from_output = distributions.AffineTransform(loc=loc, scale=scale) + transforms.append(scaler_from_output) + if transforms: + distr = distributions.TransformedDistribution(distr, transforms) return distr @classmethod