Skip to content

Implementing multistart version of theta_est using multiple sampling methods #3575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
262 changes: 262 additions & 0 deletions pyomo/contrib/parmest/parmest.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ def SSE(model):
return expr


'''Adding pseudocode for draft implementation of the estimator class,
incorporating multistart.
'''
class Estimator(object):
"""
Parameter estimation class
Expand Down Expand Up @@ -275,6 +278,11 @@ def __init__(
solver_options=None,
):

'''first theta would be provided by the user in the initialization of
the Estimator class through the unknown parameter variables. Additional
would need to be generated using the sampling method provided by the user.
'''

# check that we have a (non-empty) list of experiments
assert isinstance(experiment_list, list)
self.exp_list = experiment_list
Expand Down Expand Up @@ -447,6 +455,130 @@ def TotalCost_rule(model):
parmest_model = utils.convert_params_to_vars(model, theta_names, fix_vars=False)

return parmest_model

# Make new private method, _generate_initial_theta:
# This method will be used to generate the initial theta values for multistart
# optimization. It will take the theta names and the initial theta values
# and return a dictionary of theta names and their corresponding values.
def _generate_initial_theta(self, parmest_model, seed=None, n_restarts=None, multistart_sampling_method=None, user_provided=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All methods (even private) should have a description of each input, what type is expected, and what it's for in a brief yet coherent sentence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's probably a good idea to make sure each of the inputs are valid, in some sense, when called. For instance, if parmest_model is not a model in a specific form, there may be an error. We didn't require a model input for DoE, instead made it None and handled the case where this was sent.

if n_restarts == 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like just sending a warning, and not returning. For example, n_restarts might be 1 by default. You should check if n_restarts is an int as well. Then, if n_restarts is 1, you should send a warning that the tool is intended for this number to be greater than one and solve as normal.

# If only one restart, return an empty list
return print("No multistart optimization needed. Please use normal theta_est()")

# Get the theta names and initial theta values
theta_names = self._return_theta_names()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this strings? We went through the effort of trying to avoid using strings to do tasks because they are fragile. Is it possible to stick with suffixes and use the actual component/component ID?

initial_theta = [parmest_model.find_component(name)() for name in theta_names]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use the suffix for this? The suffix value shouldn't change, but the theta value may if the model has been solved for some reason. I don't know if this is a potential issue but I think that grabbing these values from the suffixes would be more dummy-proof.


# Get the lower and upper bounds for the theta values
lower_bound = np.array([parmest_model.find_component(name).lb for name in theta_names])
upper_bound = np.array([parmest_model.find_component(name).ub for name in theta_names])
# Check if the lower and upper bounds are defined
if any(bound is None for bound in lower_bound) and any(bound is None for bound in upper_bound):
raise ValueError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably already know this, but you will need to check all the errors are raised when expected.

"The lower and upper bounds for the theta values must be defined."
)

# Check the length of theta_names and initial_theta, and make sure bounds are defined
if len(theta_names) != len(initial_theta):
raise ValueError(
"The length of theta_names and initial_theta must be the same."
)

if multistart_sampling_method == "random":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't necessarily like that the first method is called "random". Aren't most of the methods inherently based on random sampling? It should probably be more explicit, like uniform sampling, or random uniform, or something like this.

np.random.seed(seed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to skip setting the random seed if seed=None (default)?

Copy link
Author

@sscini sscini May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is none for all the functions I use that set seed, so if it receives seed = None, it would work as expected. Would skipping it still be best practice?

# Generate random theta values
theta_vals_multistart = np.random.uniform(lower_bound, upper_bound, size=len(theta_names))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any merit in allowing the user to pull from a non-uniform distribution? Unsure what extra machinery is required but maybe for a future PR/enhancement.


# Generate theta values using Latin hypercube sampling or Sobol sampling

elif multistart_sampling_method == "latin_hypercube":
# Generate theta values using Latin hypercube sampling
sampler = scipy.stats.qmc.LatinHypercube(d=len(theta_names), seed=seed)
samples = sampler.random(n=n_restarts)
theta_vals_multistart = np.array([lower_bound + (upper_bound - lower_bound) * theta for theta in samples])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to explain what you are doing here. I suspect that this function returns an LHS refined on [0, 1] and you are rescaling. But it is best to be explicit.



elif multistart_sampling_method == "sobol":
sampler = scipy.stats.qmc.Sobol(d=len(theta_names), seed=seed)
# Generate theta values using Sobol sampling
# The first value of the Sobol sequence is 0, so we skip it
samples = sampler.random(n=n_restarts+1)[1:]
theta_vals_multistart = np.array([lower_bound + (upper_bound - lower_bound) * theta for theta in samples])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a repeated line of code. What do you think about generating the random uniform samples on [0, 1] and then sharing a common line of code to rescale?


elif multistart_sampling_method == "user_provided":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you probably should have a comment to start each different method that describes what you're doing. It is not entirely obvious what's going on when you haven't yet read the code.

# Add user provided dataframe option
if user_provided is not None:

if isinstance(user_provided, np.ndarray):
# Check if the user provided numpy array has the same number of rows as the number of restarts
if user_provided.shape[0] != n_restarts:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance (from last comment), it is unclear that a 2-D numpy array is provided where the first dimension is the number of restarts and the second dimension in the number of theta names, being a N x M matrix representing the N instances of M parameters to be "multi-started".

Copy link
Contributor

@djlaky djlaky May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is especially problematic with N = M, as the users would get results back and not know that they have made a mistake.

raise ValueError(
"The user provided numpy array must have the same number of rows as the number of restarts."
)
# Check if the user provided numpy array has the same number of columns as the number of theta names
if user_provided.shape[1] != len(theta_names):
raise ValueError(
"The user provided numpy array must have the same number of columns as the number of theta names."
)
# Check if the user provided numpy array has the same theta names as the model
# if not, raise an error
# if not all(theta in theta_names for theta in user_provided.columns):
raise ValueError(
"The user provided numpy array must have the same theta names as the model."
)
# If all checks pass, return the user provided numpy array
theta_vals_multistart = user_provided
elif isinstance(user_provided, pd.DataFrame):
# Check if the user provided dataframe has the same number of rows as the number of restarts
if user_provided.shape[0] != n_restarts:
raise ValueError(
"The user provided dataframe must have the same number of rows as the number of restarts."
)
# Check if the user provided dataframe has the same number of columns as the number of theta names
if user_provided.shape[1] != len(theta_names):
raise ValueError(
"The user provided dataframe must have the same number of columns as the number of theta names."
)
# Check if the user provided dataframe has the same theta names as the model
# if not, raise an error
# if not all(theta in theta_names for theta in user_provided.columns):
raise ValueError(
"The user provided dataframe must have the same theta names as the model."
)
# If all checks pass, return the user provided dataframe
theta_vals_multistart = user_provided.iloc[0: len(initial_theta)].values
else:
raise ValueError(
"The user must provide a numpy array or pandas dataframe from a previous attempt to use the 'user_provided' method."
)

else:
raise ValueError(
"Invalid sampling method. Choose 'random', 'latin_hypercube', 'sobol' or 'user_provided'."
)

# Make an output dataframe with the theta names and their corresponding values for each restart,
# and nan for the output info values
df_multistart = pd.DataFrame(
theta_vals_multistart, columns=theta_names
)


# Add the initial theta values to the first row of the dataframe
for i in range(1, n_restarts):
df_multistart.iloc[i, :] = theta_vals_multistart[i, :]
df_multistart.iloc[0, :] = initial_theta


# Add the output info values to the dataframe, starting values as nan
for i in range(len(theta_names)):
df_multistart[f'converged_{theta_names[i]}'] = np.nan
Copy link
Contributor

@djlaky djlaky May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all string characters legal for pandas names? I would think so but this line seems (maybe) dangerous? For instance, what if we start getting into block structuring with multi-index parameters? We should make sure there is a test to ensure the system is robust. I believe I have this on the back burner to make one for Pyomo.DoE as well.

df_multistart["initial objective"] = np.nan
df_multistart["final objective"] = np.nan
df_multistart["solver termination"] = np.nan
df_multistart["solve_time"] = np.nan

return df_multistart

def _instance_creation_callback(self, experiment_number=None, cb_data=None):
model = self._create_parmest_model(experiment_number)
Expand Down Expand Up @@ -921,6 +1053,136 @@ def theta_est(
cov_n=cov_n,
)

def theta_est_multistart(
self,
n_restarts=20,
buffer=10,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to explain the buffer in the doc string.

multistart_sampling_method="random",
user_provided=None,
seed=None,
save_results=False,
theta_vals=None,
solver="ef_ipopt",
file_name = "multistart_results.csv",
return_values=[],
):
"""
Parameter estimation using multistart optimization

Parameters
----------
n_restarts: int, optional
Number of restarts for multistart. Default is 1.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should have n_restarts and method as inputs to this method.

theta_sampling_method: string, optional
Method used to sample theta values. Options are "random", "latin_hypercube", or "sobol".
Default is "random".
solver: string, optional
Copy link
Contributor

@djlaky djlaky May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to pass custom solver options? I'm not super familiar with this but if this is called separate of the standard parmest call, you need to make sure the user can specify their own solver options.

Currently only "ef_ipopt" is supported. Default is "ef_ipopt".
return_values: list, optional
List of Variable names, used to return values from the model for data reconciliation


Returns
-------
objectiveval: float
The objective function value
thetavals: pd.Series
Estimated values for theta
variable values: pd.DataFrame
Variable values for each variable name in return_values (only for solver='ef_ipopt')

"""

# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return print(
"Multistart is not supported in the deprecated parmest interface"
)

assert isinstance(n_restarts, int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check that this is > 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please look at other Pyomo code fgor exampels of throwing exceptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @adowling2 here, you need to throw an exception so you can test the exception is caught.

assert isinstance(multistart_sampling_method, str)
assert isinstance(solver, str)
assert isinstance(return_values, list)

if n_restarts > 1 and multistart_sampling_method is not None:

# Find the initialized values of theta from the labeled parmest model
# and the theta names from the estimator object
parmest_model = self._create_parmest_model(experiment_number=0)
theta_names = self._return_theta_names()
initial_theta = [parmest_model.find_component(name)() for name in theta_names]

# Generate theta values using the sampling method
results_df = self._generate_initial_theta(parmest_model, seed=seed, n_restarts=n_restarts,
multistart_sampling_method=multistart_sampling_method, user_provided=user_provided)
results_df = pd.DataFrame(results_df)
# Extract theta_vals from the dataframe
theta_vals = results_df.iloc[:, :len(theta_names)]
converged_theta_vals = np.zeros((n_restarts, len(theta_names)))

# make empty list to store results
for i in range(n_restarts):
# for number of restarts, call the self._Q_opt method
# with the theta values generated using the _generalize_initial_theta method

# set the theta values in the model
theta_vals_current = theta_vals.iloc[i, :]


# Call the _Q_opt method with the generated theta values
objectiveval, converged_theta, variable_values = self._Q_opt(
ThetaVals=theta_vals_current,
solver=solver,
return_values=return_values,
)

# Check if the solver terminated successfully
if variable_values.solver.termination_condition != pyo.TerminationCondition.optimal:
# If not, set the objective value to NaN
solver_termination = variable_values.solver.termination_condition
solve_time = variable_values.solver.time
thetavals = np.nan

else:

# If the solver terminated successfully, set the objective value
converged_theta_vals[i, :] = converged_theta.values()
init_objectiveval = objectiveval
final_objectiveval = variable_values.solver.objective()
solver_termination = variable_values.solver.termination_condition
solve_time = variable_values.solver.time

# Check if the objective value is better than the best objective value
if final_objectiveval < best_objectiveval:
best_objectiveval = objectiveval
best_theta = thetavals

# Store the results in a list or DataFrame
# depending on the number of restarts
results_df.iloc[i, len(theta_names):len(theta_names) + len(theta_names)] = converged_theta_vals[i, :]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have the general format of the pandas frame described in text so lines like this can be checked by reviewers. Currently, it is very hard to check to make sure this is correct without running code ourselves.

results_df.iloc[i, -4] = init_objectiveval
results_df.iloc[i, -3] = objectiveval
results_df.iloc[i, -2] = variable_values.solver.termination_condition
results_df.iloc[i, -1] = variable_values.solver.time

# Add buffer to save the dataframe dynamically, if save_results is True
if save_results and (i + 1) % buffer == 0:
mode = 'w' if i + 1 == buffer else 'a'
header = i + 1 == buffer
results_df.to_csv(
file_name, mode=mode, header=header, index=False
)
print(f"Intermediate results saved after {i + 1} iterations.")

# Final save after all iterations
if save_results:
results_df.to_csv(file_name, mode='a', header=False, index=False)
print("Final results saved.")

return results_df, best_theta, best_objectiveval



def theta_est_bootstrap(
self,
bootstrap_samples,
Expand Down