Skip to content

Commit

Permalink
Refactor .opt_probs (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
knakamura13 committed Sep 5, 2024
1 parent a959586 commit 631ad9b
Show file tree
Hide file tree
Showing 9 changed files with 700 additions and 471 deletions.
205 changes: 109 additions & 96 deletions src/mlrose_ky/opt_probs/continuous_opt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Classes for defining optimization problem objects."""

from typing import Any

# Authors: Genevieve Hayes (modified by Andrew Rollings, Kyle Nakamura)
# License: BSD 3-clause

Expand All @@ -13,195 +15,202 @@ class ContinuousOpt(_OptProb):
Parameters
----------
length: int
Number of elements in state vector.
length : int
Number of elements in the state vector.
fitness_fn: fitness function object
Object to implement fitness function for optimization.
fitness_fn : Any
Fitness function for optimization.
maximize: bool, default: True
maximize : bool, default=True
Whether to maximize the fitness function.
Set :code:`False` for minimization problem.
min_val: float, default: 0
min_val : float, default=0
Minimum value that each element of the state vector can take.
max_val: float, default: 1
max_val : float, default=1
Maximum value that each element of the state vector can take.
step: float, default: 0.1
Step size used in determining neighbors of current state.
"""
step : float, default=0.
Step size used in determining neighbors of the current state.
def __init__(self, length, fitness_fn, maximize=True, min_val=0, max_val=1, step=0.1):
Attributes
----------
min_val : float
Minimum value for the state vector elements.
max_val : float
Maximum value for the state vector elements.
step : float
Step size used in neighbor determination.
prob_type : str
Problem type; always 'continuous' for this class.
"""

_OptProb.__init__(self, length, fitness_fn, maximize=maximize)
def __init__(self, length: int, fitness_fn: Any, maximize: bool = True, min_val: float = 0.0, max_val: float = 1.0, step: float = 0.1):
super().__init__(length, fitness_fn, maximize=maximize)

if (self.fitness_fn.get_prob_type() != "continuous") and (self.fitness_fn.get_prob_type() != "either"):
raise Exception(
"fitness_fn must have problem type 'continuous'"
+ """ or 'either'. Define problem as"""
+ """ DiscreteOpt problem or use alternative"""
+ """ fitness function."""
if self.fitness_fn.get_prob_type() not in {"continuous", "either"}:
raise ValueError(
"fitness_fn must have problem type 'continuous' or 'either'. "
"Define problem as DiscreteOpt or use an appropriate fitness function."
)

if max_val <= min_val:
raise Exception("""max_val must be greater than min_val.""")

raise ValueError("max_val must be greater than min_val.")
if step <= 0:
raise Exception("""step size must be positive.""")

raise ValueError("step size must be positive.")
if (max_val - min_val) < step:
raise Exception("""step size must be less than""" + """ (max_val - min_val).""")
raise ValueError(f"step size must be less than (max_val - min_val).")

self.min_val = min_val
self.max_val = max_val
self.step = step
self.prob_type = "continuous"
self.min_val: float = min_val
self.max_val: float = max_val
self.step: float = step
self.prob_type: str = "continuous"

def calculate_updates(self):
def calculate_updates(self) -> list:
"""Calculate gradient descent updates.
Returns
-------
updates: list
List of back propagation weight updates.
list
List of back-propagation weight updates.
"""
updates = self.fitness_fn.calculate_updates()
return self.fitness_fn.calculate_updates()

return updates

def find_neighbors(self):
def find_neighbors(self) -> None:
"""Find all neighbors of the current state."""
# Pre-allocate a NumPy array for neighbors (maximum of 2 * length neighbors)
neighbors_matrix = np.zeros((2 * self.length, self.length))

self.neighbors = []
neighbor_count = 0 # Track how many valid neighbors we find

for i in range(self.length):
for j in [-1, 1]:
neighbor = np.copy(self.state)
neighbor[i] += j * self.step
base_neighbor = np.copy(self.state)

for step_dir in [-self.step, self.step]:
neighbor = base_neighbor.copy()
neighbor[i] += step_dir

if neighbor[i] > self.max_val:
neighbor[i] = self.max_val
# Clip the value to ensure it remains within bounds
neighbor[i] = np.clip(neighbor[i], self.min_val, self.max_val)

elif neighbor[i] < self.min_val:
neighbor[i] = self.min_val
# Only add if different from the current state
if not np.array_equal(neighbor, self.state):
neighbors_matrix[neighbor_count] = neighbor
neighbor_count += 1

if not np.array_equal(np.array(neighbor), self.state):
self.neighbors.append(neighbor)
# Store only the valid neighbors (slice the array up to the count of valid ones)
self.neighbors = neighbors_matrix[:neighbor_count]

def get_prob_type(self):
def get_prob_type(self) -> str:
"""Return the problem type.
Returns
-------
self.prob_type: str
Returns problem type.
str
Problem type.
"""
return self.prob_type

def random(self):
"""Return a random state vector.
def random(self) -> np.ndarray:
"""Generate and return a random state vector.
Returns
-------
state: np.ndarray
np.ndarray
Randomly generated state vector.
"""
state = np.random.uniform(self.min_val, self.max_val, self.length)
return np.random.uniform(self.min_val, self.max_val, self.length)

return state

def random_neighbor(self):
"""Return random neighbor of current state vector.
def random_neighbor(self) -> np.ndarray:
"""Return a random neighbor of the current state vector.
Returns
-------
neighbor: np.ndarray
State vector of random neighbor.
np.ndarray
State vector of the random neighbor.
"""
while True:
neighbor = np.copy(self.state)
i = np.random.randint(0, self.length)

neighbor[i] += self.step * np.random.choice([-1, 1])

if neighbor[i] > self.max_val:
neighbor[i] = self.max_val

elif neighbor[i] < self.min_val:
neighbor[i] = self.min_val

if not np.array_equal(np.array(neighbor), self.state):
if not np.array_equal(neighbor, self.state):
break

return neighbor

def random_pop(self, pop_size):
def random_pop(self, pop_size: int) -> None:
"""Create a population of random state vectors.
Parameters
----------
pop_size: int
Size of population to be created.
pop_size : int
Size of the population to be created.
Raises
------
ValueError
If pop_size is not a positive integer.
"""
if pop_size <= 0:
raise Exception("""pop_size must be a positive integer.""")
elif not isinstance(pop_size, int):
if pop_size.is_integer():
pop_size = int(pop_size)
else:
raise Exception("""pop_size must be a positive integer.""")
if pop_size <= 0 or not isinstance(pop_size, int):
raise ValueError("pop_size must be a positive integer.")

population = []
pop_fitness = []

for _ in range(pop_size):
state = self.random()
fitness = self.eval_fitness(state)

population.append(state)
pop_fitness.append(fitness)

self.population = np.array(population)
self.pop_fitness = np.array(pop_fitness)

def reproduce(self, parent_1, parent_2, mutation_prob=0.1):
"""Create child state vector from two parent state vectors.
def reproduce(self, parent_1: np.ndarray, parent_2: np.ndarray, mutation_prob: float = 0.1) -> np.ndarray:
"""Create a child state vector from two parent state vectors.
Parameters
----------
parent_1: np.ndarray
parent_1 : np.ndarray
State vector for parent 1.
parent_2: np.ndarray
parent_2 : np.ndarray
State vector for parent 2.
mutation_prob: float
Probability of a mutation at each state vector element during
reproduction.
mutation_prob : float, default=0.1
Probability of a mutation at each state vector element during reproduction.
Returns
-------
child: np.ndarray
np.ndarray
Child state vector produced from parents 1 and 2.
Raises
------
ValueError
If the lengths of the parents do not match the problem length,
or if mutation_prob is not between 0 and 1.
"""
if len(parent_1) != self.length or len(parent_2) != self.length:
raise Exception("""Lengths of parents must match problem length""")
raise ValueError("Lengths of parents must match problem length.")

if (mutation_prob < 0) or (mutation_prob > 1):
raise Exception("""mutation_prob must be between 0 and 1.""")
if not (0 <= mutation_prob <= 1):
raise ValueError("mutation_prob must be between 0 and 1.")

# Reproduce parents
if self.length > 1:
_n = np.random.randint(self.length - 1)
child = np.array([0.0] * self.length)
child = np.zeros(self.length)
child[0 : _n + 1] = parent_1[0 : _n + 1]
child[_n + 1 :] = parent_2[_n + 1 :]
elif np.random.randint(2) == 0:
child = np.copy(parent_1)
else:
child = np.copy(parent_2)
child = np.copy(parent_1 if np.random.randint(2) == 0 else parent_2)

# Mutate child
rand = np.random.uniform(size=self.length)
Expand All @@ -212,30 +221,34 @@ def reproduce(self, parent_1, parent_2, mutation_prob=0.1):

return child

def reset(self):
"""Set the current state vector to a random value and get its fitness."""
def reset(self) -> None:
"""Set the current state vector to a random value and reset its fitness."""
self.state = self.random()
self.fitness_evaluations = 0
self.fitness = self.eval_fitness(self.state)

def update_state(self, updates):
"""Update current state given a vector of updates.
def update_state(self, updates: np.ndarray) -> np.ndarray:
"""Update the current state given a vector of updates.
Parameters
----------
updates: np.ndarray
Update array.
updates : np.ndarray
Array of updates.
Returns
-------
updated_state: np.ndarray
np.ndarray
Current state adjusted for updates.
Raises
------
ValueError
If the length of updates does not match the problem length.
"""
if len(updates) != self.length:
raise Exception("""Length of updates must match problem length""")
raise ValueError("Length of updates must match problem length.")

updated_state = self.state + updates

updated_state[updated_state > self.max_val] = self.max_val
updated_state[updated_state < self.min_val] = self.min_val

Expand Down
Loading

0 comments on commit 631ad9b

Please sign in to comment.