From 44728e564cccd0febd413521016fb33d91489152 Mon Sep 17 00:00:00 2001 From: Ekaterina Noskova Date: Wed, 6 Nov 2024 14:48:40 +0100 Subject: [PATCH] Fix codestyle --- gadma/__init__.py | 20 +- gadma/optimizers/bayesian_optimization.py | 452 +++++++++++----------- gadma/optimizers/gaussian_process.py | 170 ++++---- 3 files changed, 321 insertions(+), 321 deletions(-) diff --git a/gadma/__init__.py b/gadma/__init__.py index f01897a..36c19a0 100644 --- a/gadma/__init__.py +++ b/gadma/__init__.py @@ -54,14 +54,14 @@ except ImportError: demesdraw = None -#try: -# import GPy -#except ImportError: -# GPy = None -#try: -# import GPyOpt -#except ImportError: -# GPyOpt = None +# try: +# import GPy +# except ImportError: +# GPy = None +# try: +# import GPyOpt +# except ImportError: +# GPyOpt = None try: import smac # NOQA import ConfigSpace # NOQA @@ -86,8 +86,8 @@ demes_available = demes is not None demesdraw_available = demesdraw is not None -#GPy_available = GPy is not None -#GPyOpt_available = GPyOpt is not None +# GPy_available = GPy is not None +# GPyOpt_available = GPyOpt is not None bayesmark_available = bayesmark is not None from .data import DataHolder, SFSDataHolder, VCFDataHolder # NOQA diff --git a/gadma/optimizers/bayesian_optimization.py b/gadma/optimizers/bayesian_optimization.py index aed5629..f46e172 100644 --- a/gadma/optimizers/bayesian_optimization.py +++ b/gadma/optimizers/bayesian_optimization.py @@ -83,232 +83,232 @@ def choose_kernel_if_needed( return optimizer.kernel_name, None -#class GPyOptBayesianOptimizer(GlobalOptimizer, ConstrainedOptimizer): -# """ -# Class for Bayesian optimization -# """ -# def __init__(self, kernel="Auto", ARD=True, acquisition_type='MPI', -# random_type='resample', custom_rand_gen=None, -# log_transform=False, maximize=False): -# if not GPy_available or not GPyOpt_available: -# raise ValueError("Install GPyOpt and GPy to use " -# "Bayesian optimization.") -# self.kernel_name = kernel -# self.ARD = ARD -# self.acquisition_type = acquisition_type -# super(GPyOptBayesianOptimizer, self).__init__( -# random_type=random_type, -# custom_rand_gen=custom_rand_gen, -# log_transform=log_transform, -# maximize=maximize -# ) -# -# def _get_kernel_class(self): -# if self.kernel_name.lower() == "auto": -# return None -# kernel_name = self.kernel_name.lower().capitalize() -# if kernel_name == "Rbf": -# kernel_name = "RBF" -# return op.attrgetter(kernel_name)(GPy.kern) -# -# def get_kernel(self, config_space): -# kernel = self._get_kernel_class()(len(config_space), ARD=self.ARD) -# return kernel -# -# def get_config_space(self, variables): -# gpy_domain = [] -# for var in variables: -# gpy_domain.append({'name': var.name, -# 'type': var.var_type, -# 'domain': var.domain}) -# return gpy_domain -# -# @staticmethod -# def _write_report_to_stream(variables, run_info, stream): -# -# def get_x_repr(x): -# if isinstance(x, WeightedMetaArray): -# return x.str_as_list() -# return str(list(x)) -# -# bo_obj = run_info.bo_obj -# if bo_obj is not None: -# bo_obj._compute_results() -# x_best = run_info.result.x -# y_best = run_info.result.y -# n_iter = run_info.result.n_iter -# -# print('====================== Iteration %05d ======================' % -# n_iter, file=stream) -# -# if n_iter == 0: -# print("Initial design:", file=stream) -# else: -# print("Got points:", file=stream) -# -# print("Fitness function\tParameters", file=stream) -# for x, y in zip(run_info.result.X_out, run_info.result.Y_out): -# print(f"{y}\t{get_x_repr(x)}", file=stream) -# -# if bo_obj is not None: -# print('\nCurrent state of the model:', file=stream) -# -# print(str(bo_obj.model), file=stream) -# print(bo_obj.model.model.kern.lengthscale, file=stream) -# -# if hasattr(run_info, "gp_train_times"): -# print("\nTime for GP training:", run_info.gp_train_times[n_iter], -# file=stream) -# if hasattr(run_info, "gp_predict_times"): -# print("Time for GP prediction:", -# run_info.gp_predict_times[n_iter], -# file=stream) -# if hasattr(run_info, "acq_opt_times"): -# print("Time for acq. optim.:", -# run_info.acq_opt_times[n_iter], file=stream) -# if hasattr(run_info, "eval_times"): -# print("Time of evaluation:", -# run_info.eval_times[n_iter], file=stream) -# if hasattr(run_info, "iter_times"): -# print("Total time of iteration:", -# run_info.iter_times[n_iter], file=stream) -# -# print('=============================================================', -# end="\n\n", file=stream) -# print('*************************************************************', -# file=stream) -# print('Current optimum: %0.3f' % y_best, file=stream) -# print(f'On parameters: {get_x_repr(x_best)}', file=stream) -# print('*************************************************************', -# file=stream) -# print("\n", file=stream) -# -# def _create_run_info(self): -# """ -# Creates the initial run_info. It has the following fields: -# * `result` - empty :class:`gadma.optimizers.OptimizerResult` with\ -# `n_iter`==-1. -# * `bo_obj` - Object of BO from GpyOpt. -# """ -# run_info = super(GPyOptBayesianOptimizer, self)._create_run_info() -# run_info.bo_obj = None -# return run_info -# -# def valid_restore_file(self, save_file): -# try: -# run_info = self.load(save_file) -# except Exception: -# return False -# if (not isinstance(run_info.result.n_eval, int) or -# not isinstance(run_info.result.n_iter, int)): -# return False -# return True -# -# def save(self, run_info, save_file): -# # run_info.bo_obj could not be deepcopied and pickled so we ignore it -# info = self._create_run_info() -# info.result = copy.deepcopy(run_info.result) -# # also change X_out to be equal to X_total. For good restore -# info.result.X_out = info.result.X -# info.result.Y_out = info.result.Y -# super(GPyOptBayesianOptimizer, self).save(info, save_file) -# -# def load(self, save_file): -# run_info = super(GPyOptBayesianOptimizer, self).load(save_file) -# run_info.result.X_out = list(run_info.result.X) -# run_info.result.Y_out = list(run_info.result.Y) -# return run_info -# -# def get_model(self, config_space): -# from GPyOpt.models import GPModel -# kernel = self.get_kernel(config_space) -# gp_model = GPModel( -# kernel=kernel, -# noise_var=None, -# exact_feval=True, -# optimizer="lbfgs", -# max_iters=1000, -# optimize_restarts=5, -# sparse=False, -# num_inducing=10, -# verbose=False, -# ARD=self.ARD -# ) -# return GPyGaussianProcess(gp_model) -# -# def _optimize(self, f, variables, X_init, Y_init, maxiter, maxeval, -# iter_callback): -# from GPyOpt.methods import BayesianOptimization -# from GPyOpt.core.task.objective import SingleObjective -# -# maxeval = get_maxeval_for_bo(maxeval, maxiter) -# -# kernel_name, message = choose_kernel_if_needed( -# self, variables, X_init, Y_init -# ) -# self.kernel_name = kernel_name -# -# ndim = len(variables) -# -# if len(Y_init) > 0: -# x_best = X_init[0] -# y_best = Y_init[0] -# iter_callback(x_best, y_best, X_init, Y_init, message=message) -# -# gpy_domain = self.get_config_space(variables) -# kernel = self.get_kernel(config_space=gpy_domain) -# -# Y_init = np.array(Y_init).reshape(len(Y_init), -1) -# X_init = np.array(X_init, dtype=float) -# -# gpy_model = self.get_model(config_space=gpy_domain).gp_model -# -# bo = BayesianOptimization(f=f, -# domain=gpy_domain, -# model_type='user-specified', -# model=gpy_model, -# acquisition_type=self.acquisition_type, -# kernel=kernel, -# X=np.array(X_init), -# Y=np.array(Y_init), -# exact_feval=True, -# verbosity=True, -# ) -# bo.num_acquisitions = self.run_info.result.n_eval -# self.run_info.bo_obj = bo -# -# def f_in_gpyopt(X): -# Y = [] -# x_best = self.transform(self.run_info.result.x) -# y_best = self.sign * self.run_info.result.y -# for x in X: -# y = f(x) -# if y_best is None or y < y_best: -# x_best = x -# y_best = y -# Y.append(y) -# iter_callback(x=x_best, y=y_best, X_iter=X, Y_iter=Y) -# return np.array(Y).reshape(len(Y), -1) -# -# bo.f = bo._sign(f_in_gpyopt) -# bo.objective = SingleObjective( -# bo.f, bo.batch_size, bo.objective_name) -# -# bo.run_optimization(max_iter=maxeval-len(X_init), eps=0, -# verbosity=False) -# -# result = OptimizerResult.from_GPyOpt_OptimizerResult(bo) -# self.run_info.result.success = True -# self.run_info.status = result.status -# self.run_info.message = result.message -# return self.run_info.result -# -# -#if GPyOpt_available: -# register_global_optimizer( -# 'GPyOpt_Bayesian_optimization', -# GPyOptBayesianOptimizer -# ) +# class GPyOptBayesianOptimizer(GlobalOptimizer, ConstrainedOptimizer): +# """ +# Class for Bayesian optimization +# """ +# def __init__(self, kernel="Auto", ARD=True, acquisition_type='MPI', +# random_type='resample', custom_rand_gen=None, +# log_transform=False, maximize=False): +# if not GPy_available or not GPyOpt_available: +# raise ValueError("Install GPyOpt and GPy to use " +# "Bayesian optimization.") +# self.kernel_name = kernel +# self.ARD = ARD +# self.acquisition_type = acquisition_type +# super(GPyOptBayesianOptimizer, self).__init__( +# random_type=random_type, +# custom_rand_gen=custom_rand_gen, +# log_transform=log_transform, +# maximize=maximize +# ) +# +# def _get_kernel_class(self): +# if self.kernel_name.lower() == "auto": +# return None +# kernel_name = self.kernel_name.lower().capitalize() +# if kernel_name == "Rbf": +# kernel_name = "RBF" +# return op.attrgetter(kernel_name)(GPy.kern) +# +# def get_kernel(self, config_space): +# kernel = self._get_kernel_class()(len(config_space), ARD=self.ARD) +# return kernel +# +# def get_config_space(self, variables): +# gpy_domain = [] +# for var in variables: +# gpy_domain.append({'name': var.name, +# 'type': var.var_type, +# 'domain': var.domain}) +# return gpy_domain +# +# @staticmethod +# def _write_report_to_stream(variables, run_info, stream): +# +# def get_x_repr(x): +# if isinstance(x, WeightedMetaArray): +# return x.str_as_list() +# return str(list(x)) +# +# bo_obj = run_info.bo_obj +# if bo_obj is not None: +# bo_obj._compute_results() +# x_best = run_info.result.x +# y_best = run_info.result.y +# n_iter = run_info.result.n_iter +# +# print('====================== Iteration %05d ======================' % +# n_iter, file=stream) +# +# if n_iter == 0: +# print("Initial design:", file=stream) +# else: +# print("Got points:", file=stream) +# +# print("Fitness function\tParameters", file=stream) +# for x, y in zip(run_info.result.X_out, run_info.result.Y_out): +# print(f"{y}\t{get_x_repr(x)}", file=stream) +# +# if bo_obj is not None: +# print('\nCurrent state of the model:', file=stream) +# +# print(str(bo_obj.model), file=stream) +# print(bo_obj.model.model.kern.lengthscale, file=stream) +# +# if hasattr(run_info, "gp_train_times"): +# print("\nTime for GP training:", run_info.gp_train_times[n_iter], +# file=stream) +# if hasattr(run_info, "gp_predict_times"): +# print("Time for GP prediction:", +# run_info.gp_predict_times[n_iter], +# file=stream) +# if hasattr(run_info, "acq_opt_times"): +# print("Time for acq. optim.:", +# run_info.acq_opt_times[n_iter], file=stream) +# if hasattr(run_info, "eval_times"): +# print("Time of evaluation:", +# run_info.eval_times[n_iter], file=stream) +# if hasattr(run_info, "iter_times"): +# print("Total time of iteration:", +# run_info.iter_times[n_iter], file=stream) +# +# print('=============================================================', +# end="\n\n", file=stream) +# print('*************************************************************', +# file=stream) +# print('Current optimum: %0.3f' % y_best, file=stream) +# print(f'On parameters: {get_x_repr(x_best)}', file=stream) +# print('*************************************************************', +# file=stream) +# print("\n", file=stream) +# +# def _create_run_info(self): +# """ +# Creates the initial run_info. It has the following fields: +# * `result` - empty :class:`gadma.optimizers.OptimizerResult` with\ +# `n_iter`==-1. +# * `bo_obj` - Object of BO from GpyOpt. +# """ +# run_info = super(GPyOptBayesianOptimizer, self)._create_run_info() +# run_info.bo_obj = None +# return run_info +# +# def valid_restore_file(self, save_file): +# try: +# run_info = self.load(save_file) +# except Exception: +# return False +# if (not isinstance(run_info.result.n_eval, int) or +# not isinstance(run_info.result.n_iter, int)): +# return False +# return True +# +# def save(self, run_info, save_file): +# # run_info.bo_obj could not be deepcopied and pickled so we ignore it +# info = self._create_run_info() +# info.result = copy.deepcopy(run_info.result) +# # also change X_out to be equal to X_total. For good restore +# info.result.X_out = info.result.X +# info.result.Y_out = info.result.Y +# super(GPyOptBayesianOptimizer, self).save(info, save_file) +# +# def load(self, save_file): +# run_info = super(GPyOptBayesianOptimizer, self).load(save_file) +# run_info.result.X_out = list(run_info.result.X) +# run_info.result.Y_out = list(run_info.result.Y) +# return run_info +# +# def get_model(self, config_space): +# from GPyOpt.models import GPModel +# kernel = self.get_kernel(config_space) +# gp_model = GPModel( +# kernel=kernel, +# noise_var=None, +# exact_feval=True, +# optimizer="lbfgs", +# max_iters=1000, +# optimize_restarts=5, +# sparse=False, +# num_inducing=10, +# verbose=False, +# ARD=self.ARD +# ) +# return GPyGaussianProcess(gp_model) +# +# def _optimize(self, f, variables, X_init, Y_init, maxiter, maxeval, +# iter_callback): +# from GPyOpt.methods import BayesianOptimization +# from GPyOpt.core.task.objective import SingleObjective +# +# maxeval = get_maxeval_for_bo(maxeval, maxiter) +# +# kernel_name, message = choose_kernel_if_needed( +# self, variables, X_init, Y_init +# ) +# self.kernel_name = kernel_name +# +# ndim = len(variables) +# +# if len(Y_init) > 0: +# x_best = X_init[0] +# y_best = Y_init[0] +# iter_callback(x_best, y_best, X_init, Y_init, message=message) +# +# gpy_domain = self.get_config_space(variables) +# kernel = self.get_kernel(config_space=gpy_domain) +# +# Y_init = np.array(Y_init).reshape(len(Y_init), -1) +# X_init = np.array(X_init, dtype=float) +# +# gpy_model = self.get_model(config_space=gpy_domain).gp_model +# +# bo = BayesianOptimization(f=f, +# domain=gpy_domain, +# model_type='user-specified', +# model=gpy_model, +# acquisition_type=self.acquisition_type, +# kernel=kernel, +# X=np.array(X_init), +# Y=np.array(Y_init), +# exact_feval=True, +# verbosity=True, +# ) +# bo.num_acquisitions = self.run_info.result.n_eval +# self.run_info.bo_obj = bo +# +# def f_in_gpyopt(X): +# Y = [] +# x_best = self.transform(self.run_info.result.x) +# y_best = self.sign * self.run_info.result.y +# for x in X: +# y = f(x) +# if y_best is None or y < y_best: +# x_best = x +# y_best = y +# Y.append(y) +# iter_callback(x=x_best, y=y_best, X_iter=X, Y_iter=Y) +# return np.array(Y).reshape(len(Y), -1) +# +# bo.f = bo._sign(f_in_gpyopt) +# bo.objective = SingleObjective( +# bo.f, bo.batch_size, bo.objective_name) +# +# bo.run_optimization(max_iter=maxeval-len(X_init), eps=0, +# verbosity=False) +# +# result = OptimizerResult.from_GPyOpt_OptimizerResult(bo) +# self.run_info.result.success = True +# self.run_info.status = result.status +# self.run_info.message = result.message +# return self.run_info.result +# +# +# if GPyOpt_available: +# register_global_optimizer( +# 'GPyOpt_Bayesian_optimization', +# GPyOptBayesianOptimizer +# ) class SMACSquirrelOptimizer(GlobalOptimizer, ConstrainedOptimizer): diff --git a/gadma/optimizers/gaussian_process.py b/gadma/optimizers/gaussian_process.py index de61167..e88e56d 100644 --- a/gadma/optimizers/gaussian_process.py +++ b/gadma/optimizers/gaussian_process.py @@ -1,85 +1,85 @@ -#import numpy as np -#import copy -# -# -#class GaussianProcess(object): -# """ -# Base class to keep Gaussian process for Bayesian optimization. -# """ -# def __init__(self, gp_model): -# self.gp_model = gp_model -# -# def train(self, X, Y, optimize=True): -# raise NotImplementedError -# -# def get_noise(self): -# raise NotImplementedError -# -# def predict(self, X): -# raise NotImplementedError -# -# def get_K(self): -# raise NotImplementedError -# -# def get_hypers(self): -# raise NotImplementedError -# -# -#class GPyGaussianProcess(GaussianProcess): -# def _convert(self, X, Y=None): -# X = np.array(X, dtype=float) -# if Y is not None: -# return X, np.array(Y).reshape(len(Y), -1) -# return X -# -# def train(self, X, Y, optimize=True): -# X, Y = self._convert(X, Y) -# if self.gp_model.model is None: -# self.gp_model._create_model(X=X, Y=Y) -# else: -# self.gp_model.model.set_XY(X, Y) -# if optimize: -# self.gp_model.updateModel(X_all=X, Y_all=Y, X_new=X, Y_new=Y) -# -# def get_noise(self): -# return float(self.gp_model.model.Gaussian_noise.variance) -# -# def predict(self, X): -# X = self._convert(X) -# mu, sigma = self.gp_model.predict(X) -# return mu.reshape(mu.shape[0]), sigma.reshape(sigma.shape[0]) -# -# def get_K(self): -# X = self.gp_model.model.X -# return self.gp_model.model.kern.K(X, X) -# -# def get_hypers(self): -# theta = [] -# theta.extend(self.gp_model.model.kern.lengthscale) -# theta.append(self.get_noise()) -# return theta -# -# -#class SMACGaussianProcess(GaussianProcess): -# def __init__(self, gp_model): -# super(SMACGaussianProcess, self).__init__(gp_model=gp_model) -# self.gp_model.normalize_y = True -# -# def train(self, X, Y, optimize=True): -# self.gp_model._train(X, Y, do_optimize=optimize) -# -# def get_noise(self): -# return 0 -# -# def predict(self, X): -# mu, var = self.gp_model.predict_marginalized_over_instances( -# np.array(X) -# ) -# sigma = np.sqrt(var) -# return mu.reshape(mu.shape[0]), sigma.reshape(sigma.shape[0]) -# -# def get_K(self): -# return self.gp_model.kernel(self.gp_model.gp.X_train_) -# -# def get_hypers(self): -# return np.exp(self.gp_model.hypers) +# import numpy as np +# import copy +# +# +# class GaussianProcess(object): +# """ +# Base class to keep Gaussian process for Bayesian optimization. +# """ +# def __init__(self, gp_model): +# self.gp_model = gp_model +# +# def train(self, X, Y, optimize=True): +# raise NotImplementedError +# +# def get_noise(self): +# raise NotImplementedError +# +# def predict(self, X): +# raise NotImplementedError +# +# def get_K(self): +# raise NotImplementedError +# +# def get_hypers(self): +# raise NotImplementedError +# +# +# class GPyGaussianProcess(GaussianProcess): +# def _convert(self, X, Y=None): +# X = np.array(X, dtype=float) +# if Y is not None: +# return X, np.array(Y).reshape(len(Y), -1) +# return X +# +# def train(self, X, Y, optimize=True): +# X, Y = self._convert(X, Y) +# if self.gp_model.model is None: +# self.gp_model._create_model(X=X, Y=Y) +# else: +# self.gp_model.model.set_XY(X, Y) +# if optimize: +# self.gp_model.updateModel(X_all=X, Y_all=Y, X_new=X, Y_new=Y) +# +# def get_noise(self): +# return float(self.gp_model.model.Gaussian_noise.variance) +# +# def predict(self, X): +# X = self._convert(X) +# mu, sigma = self.gp_model.predict(X) +# return mu.reshape(mu.shape[0]), sigma.reshape(sigma.shape[0]) +# +# def get_K(self): +# X = self.gp_model.model.X +# return self.gp_model.model.kern.K(X, X) +# +# def get_hypers(self): +# theta = [] +# theta.extend(self.gp_model.model.kern.lengthscale) +# theta.append(self.get_noise()) +# return theta +# +# +# class SMACGaussianProcess(GaussianProcess): +# def __init__(self, gp_model): +# super(SMACGaussianProcess, self).__init__(gp_model=gp_model) +# self.gp_model.normalize_y = True +# +# def train(self, X, Y, optimize=True): +# self.gp_model._train(X, Y, do_optimize=optimize) +# +# def get_noise(self): +# return 0 +# +# def predict(self, X): +# mu, var = self.gp_model.predict_marginalized_over_instances( +# np.array(X) +# ) +# sigma = np.sqrt(var) +# return mu.reshape(mu.shape[0]), sigma.reshape(sigma.shape[0]) +# +# def get_K(self): +# return self.gp_model.kernel(self.gp_model.gp.X_train_) +# +# def get_hypers(self): +# return np.exp(self.gp_model.hypers)