Skip to content

Commit

Permalink
Integrate comments from review
Browse files Browse the repository at this point in the history
  • Loading branch information
timmens committed Feb 4, 2025
1 parent 8607490 commit 90c5e6e
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 78 deletions.
112 changes: 68 additions & 44 deletions src/optimagic/optimization/history.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import warnings
from dataclasses import dataclass
from functools import partial
Expand Down Expand Up @@ -99,17 +101,12 @@ def _get_next_batch_id(self) -> int:
# Function data, function value, and monotone function value
# ----------------------------------------------------------------------------------

def fun_data(
self, cost_model: CostModel, monotone: bool = False, dropna: bool = False
) -> pd.DataFrame:
def fun_data(self, cost_model: CostModel, monotone: bool = False) -> pd.DataFrame:
"""Return the function value data.
Args:
cost_model: The cost model that is used to calculate the time measure.
monotone: Whether to return the monotone function values. Defaults to False.
dropna: Whether to drop rows with missing values. These correspond to
parameters that were used to calculate a pure jacobian. Defaults to
False.
Returns:
pd.DataFrame: The function value data. The columns are: 'fun', 'time' and
Expand All @@ -124,8 +121,9 @@ def fun_data(
fun = np.array(self.fun, dtype=np.float64) # converts None to nan

timings = self._get_total_timings(cost_model)
task = _task_to_categorical(self.task)

if not self.is_serial:
if not self._is_serial():
# In the non-serial case, we take the batching into account and reduce
# timings and fun to one value per batch.
timings = _apply_reduction_to_batches(
Expand All @@ -143,18 +141,16 @@ def fun_data(
reduction_function=min_or_max, # type: ignore[arg-type]
)

time = np.cumsum(timings)
data = pd.DataFrame({"fun": fun, "time": time})

if self.is_serial:
# In the non-serial case, the task column is meaningless, since multiple
# tasks would need to be reduced to one.
data["task"] = _task_to_categorical(self.task)
# Verify that tasks are homogeneous in each batch, and select first if true.
tasks_and_batches = pd.DataFrame({"task": task, "batches": self.batches})
grouped_tasks = tasks_and_batches.groupby("batches")["task"]
if not grouped_tasks.nunique().eq(1).all():
raise ValueError("Tasks are not homogeneous in each batch.")

if dropna:
data = data.dropna()
task = grouped_tasks.first().reset_index(drop=True)

return data.rename_axis("counter")
time = np.cumsum(timings)
return pd.DataFrame({"fun": fun, "time": time, "task": task})

@property
def fun(self) -> list[float | None]:
Expand Down Expand Up @@ -191,13 +187,18 @@ def is_accepted(self) -> NDArray[np.bool_]:
# Parameter data, params, flat params, and flat params names
# ----------------------------------------------------------------------------------

def params_data(self, dropna: bool = False) -> pd.DataFrame:
def params_data(
self, dropna: bool = False, collapse_batches: bool = False
) -> pd.DataFrame:
"""Return the parameter data.
Args:
dropna: Whether to drop rows with missing values. These correspond to
parameters that were used to calculate a pure jacobian. Defaults to
dropna: Whether to drop rows with missing function values. These correspond
to parameters that were used to calculate pure jacobians. Defaults to
False.
collapse_batches: Whether to collapse the batches and only keep the
parameters that led to the minimal (or maximal) function value in each
batch. Defaults to False.
Returns:
pd.DataFrame: The parameter data. The columns are: 'name' (the parameter
Expand All @@ -210,31 +211,50 @@ def params_data(self, dropna: bool = False) -> pd.DataFrame:
wide["task"] = _task_to_categorical(self.task)
wide["fun"] = self.fun

# In the batch case, we select only the parameters in a batch that led to the
# minimal (or maximal) function value in that batch.
if not self.is_serial:
# If requested, we collapse the batches and only keep the parameters that led to
# the minimal (or maximal) function value in each batch.
if collapse_batches and not self._is_serial():
wide["batches"] = self.batches

# Verify that tasks are homogeneous in each batch
if not wide.groupby("batches")["task"].nunique().eq(1).all():
raise ValueError("Tasks are not homogeneous in each batch.")

# We fill nans with inf or -inf to make sure that the idxmin/idxmax is
# well-defined, since there is the possibility that all fun values are nans
# in a batch.
if self.direction == Direction.MINIMIZE:
loc_of_fun_optimizer = wide.groupby("batches")["fun"].idxmin()
loc = (
wide.assign(fun_without_nan=wide["fun"].fillna(np.inf))
.groupby("batches")["fun_without_nan"]
.idxmin()
)
elif self.direction == Direction.MAXIMIZE:
loc_of_fun_optimizer = wide.groupby("batches")["fun"].idxmax()
loc = (
wide.assign(fun_without_nan=wide["fun"].fillna(-np.inf))
.groupby("batches")["fun_without_nan"]
.idxmax()
)

wide = wide.loc[loc].drop(columns="batches")

wide = wide.loc[loc_of_fun_optimizer].drop(columns="batches")
# We drop rows with missing values if requested. These correspond to parameters
# that were used to calculate pure jacobians. This step must be done before
# dropping the fun column and before setting the counter.
if dropna:
wide = wide.dropna()
wide = wide.drop(columns="fun")

wide["counter"] = np.arange(len(wide))

long = pd.melt(
wide,
var_name="name",
value_name="value",
id_vars=["task", "fun", "counter"],
id_vars=["task", "counter"],
)

data = long.reindex(columns=["counter", "name", "value", "task", "fun"])

if dropna:
data = data.dropna()
data = long.reindex(columns=["counter", "name", "value", "task"])

return data.set_index(["counter", "name"]).sort_index()

Expand Down Expand Up @@ -326,8 +346,7 @@ def stop_time(self) -> list[float]:
def batches(self) -> list[int]:
return self._batches

@property
def is_serial(self) -> bool:
def _is_serial(self) -> bool:
return np.array_equal(self.batches, np.arange(len(self.batches)))

# Tasks
Expand Down Expand Up @@ -373,7 +392,7 @@ def __getitem__(self, key: str) -> Any:


# ======================================================================================
# Methods
# Functions directly used in History methods
# ======================================================================================


Expand Down Expand Up @@ -442,10 +461,9 @@ def _validate_args_are_all_none_or_lists_of_same_length(
raise ValueError("All arguments must be lists of the same length or None.")


def _task_to_categorical(task: list[EvalTask]) -> pd.Categorical:
return pd.Categorical(
[t.value for t in task], categories=[t.value for t in EvalTask]
)
def _task_to_categorical(task: list[EvalTask]) -> pd.Series[str]:
EvalTaskDtype = pd.CategoricalDtype(categories=[t.value for t in EvalTask])
return pd.Series([t.value for t in task], dtype=EvalTaskDtype)


def _apply_reduction_to_batches(
Expand All @@ -462,7 +480,8 @@ def _apply_reduction_to_batches(
batch_ids: A list with batch ids whose length is equal to the size of data.
Values need to be sorted and can be repeated.
reduction_function: A reduction function that takes an iterable of floats as
input (e.g., a numpy.ndarray or list) and returns a scalar.
input (e.g., a numpy.ndarray or list of floats) and returns a scalar. The
function must be able to handle NaN's.
Returns:
The transformed data. Has one entry per unique batch id, equal to the result of
Expand All @@ -478,21 +497,26 @@ def _apply_reduction_to_batches(
batch_id = batch_ids[start]

try:
reduced = reduction_function(batch_data)
if np.isnan(batch_data).all():
reduced = np.nan
else:
reduced = reduction_function(batch_data)
except Exception as e:
msg = (
f"Calling function {reduction_function.__name__} on batch {batch_id} "
"of the History raised an Exception. Please verify that "
f"{reduction_function.__name__} is well-defined, takes a list of "
"floats as input, and returns a scalar."
f"{reduction_function.__name__} is well-defined, takes an iterable of "
"floats as input and returns a scalar. The function must be able to "
"handle NaN's."
)
raise ValueError(msg) from e

if not np.isscalar(reduced):
msg = (
f"Function {reduction_function.__name__} did not return a scalar for "
f"batch {batch_id}. Please verify that {reduction_function.__name__} "
"returns a scalar when called on a list of floats."
"returns a scalar when called on an iterable of floats. The function "
"must be able to handle NaN's."
)
raise ValueError(msg)

Expand All @@ -504,7 +528,7 @@ def _apply_reduction_to_batches(
def _get_batch_starts_and_stops(batch_ids: list[int]) -> tuple[list[int], list[int]]:
"""Get start and stop indices of batches.
This function assumes that batch_ids non-empty and sorted.
This function assumes that batch_ids are non-empty and sorted.
"""
ids_arr = np.array(batch_ids, dtype=np.int64)
Expand Down
Loading

0 comments on commit 90c5e6e

Please sign in to comment.