From e7ae4af2783d0a402e684943ac395ff66864e47f Mon Sep 17 00:00:00 2001 From: Kyle Nakamura Date: Wed, 4 Sep 2024 00:05:24 -0700 Subject: [PATCH] Refactor .neural.fitness --- .../neural/fitness/network_weights.py | 149 ++++++++---------- 1 file changed, 64 insertions(+), 85 deletions(-) diff --git a/src/mlrose_ky/neural/fitness/network_weights.py b/src/mlrose_ky/neural/fitness/network_weights.py index a91b9f32..4bec2629 100644 --- a/src/mlrose_ky/neural/fitness/network_weights.py +++ b/src/mlrose_ky/neural/fitness/network_weights.py @@ -3,6 +3,8 @@ # Author: Genevieve Hayes # License: BSD 3-clause +from typing import Callable + import numpy as np import sklearn.metrics as skm @@ -11,75 +13,77 @@ class NetworkWeights: - """Fitness function for neural network weights optimization problem. + """ + Fitness function for neural network weights optimization problem. Parameters ---------- - X: np.ndarray + X : np.ndarray Numpy array containing feature dataset with each row representing a single observation. - - y: np.ndarray + y : np.ndarray Numpy array containing true values of data labels. - Length must be same as length of X. - - node_list: list[int] + Length must be the same as the length of X. + node_list : list[int] Number of nodes in each layer, including the input and output layers. - - activation: callable + activation : Callabe Activation function for each of the hidden layers with the signature - :code:`activation(x, deriv)`, where setting deriv is a boolean that + `activation(x, deriv)`, where setting deriv is a boolean that determines whether to return the activation function or its derivative. - - bias: bool, default: True + bias : bool, default=True Whether a bias term is included in the network. - - is_classifier: bool, default: True + is_classifier : bool, default=True Whether the network is for classification or regression. Set True for classification and False for regression. + learning_rate : float, default=0.1 + The learning rate for gradient descent updates. """ - def __init__(self, X, y, node_list: list[int], activation, bias=True, is_classifier=True, learning_rate=0.1): - # Ensure the activation function has the correct signature + def __init__( + self, + X: np.ndarray, + y: np.ndarray, + node_list: list[int], + activation: Callable, + bias: bool = True, + is_classifier: bool = True, + learning_rate: float = 0.1, + ): if not callable(activation): raise TypeError("Activation function must be callable.") try: - activation(np.array([0.1]), deriv=False) # Test the function signature + activation(np.array([0.1]), deriv=False) except TypeError: raise TypeError("Activation function must accept two arguments: 'x' and 'deriv'.") - # Check for empty dataset if X.size == 0 or y.size == 0: raise ValueError("X and y cannot be empty.") - # Make sure y is an array and not a list y = np.array(y) - # Convert y to 2D if necessary if len(np.shape(y)) == 1: y = np.reshape(y, [len(y), 1]) - # Verify X and y are the same length - if not np.shape(X)[0] == np.shape(y)[0]: - raise Exception("""The length of X and y must be equal.""") + if np.shape(X)[0] != np.shape(y)[0]: + raise ValueError(f"The length of X ({np.shape(X)[0]}) and y ({np.shape(y)[0]}) must be equal.") if len(node_list) < 2: - raise Exception("""node_list must contain at least 2 elements.""") + raise ValueError("node_list must contain at least 2 elements.") - if not np.shape(X)[1] == (node_list[0] - bias): - raise Exception("""The number of columns in X must equal %d""" % ((node_list[0] - bias),)) + if np.shape(X)[1] != (node_list[0] - bias): + raise ValueError(f"The number of columns in X must equal {node_list[0] - bias}.") - if not np.shape(y)[1] == node_list[-1]: - raise Exception("""The number of columns in y must equal %d""" % (node_list[-1],)) + if np.shape(y)[1] != node_list[-1]: + raise ValueError(f"The number of columns in y must equal {node_list[-1]}.") if not isinstance(bias, bool): - raise Exception("""bias must be True or False.""") + raise ValueError("bias must be True or False.") if not isinstance(is_classifier, bool): - raise Exception("""is_classifier must be True or False.""") + raise ValueError("is_classifier must be True or False.") if learning_rate <= 0: - raise Exception("""learning_rate must be greater than 0.""") + raise ValueError("learning_rate must be greater than 0.") self.X = X self.y_true = y @@ -89,14 +93,9 @@ def __init__(self, X, y, node_list: list[int], activation, bias=True, is_classif self.is_classifier = is_classifier self.learning_rate = learning_rate - # Determine appropriate loss function and output activation function if self.is_classifier: self.loss = skm.log_loss - - if np.shape(self.y_true)[1] == 1: - self.output_activation = act.sigmoid - else: - self.output_activation = act.softmax + self.output_activation = act.sigmoid if np.shape(self.y_true)[1] == 1 else act.softmax else: self.loss = skm.mean_squared_error self.output_activation = act.identity @@ -106,107 +105,87 @@ def __init__(self, X, y, node_list: list[int], activation, bias=True, is_classif self.weights = [] self.prob_type = "continuous" - nodes = 0 - for i in range(len(node_list) - 1): - nodes += node_list[i] * node_list[i + 1] - - self.nodes = nodes + self.nodes = sum(node_list[i] * node_list[i + 1] for i in range(len(node_list) - 1)) - def evaluate(self, state): - """Evaluate the fitness of a state. + def evaluate(self, state: np.ndarray) -> float: + """ + Evaluate the fitness of a state. Parameters ---------- - state: np.ndarray + state : np.ndarray State array for evaluation. Returns ------- - fitness: float + fitness : float Value of fitness function. """ - if not len(state) == self.nodes: - raise Exception("""state must have length %d""" % (self.nodes,)) + if len(state) != self.nodes: + raise ValueError(f"state must have length {self.nodes}, got {len(state)}.") self.inputs_list = [] - self.weights: list = list(unflatten_weights(state, self.node_list)) + self.weights = list(unflatten_weights(state, self.node_list)) - # Add bias column to inputs matrix, if required if self.bias: ones = np.ones([np.shape(self.X)[0], 1]) inputs = np.hstack((self.X, ones)) - else: inputs = self.X - # Pass data through network for i in range(len(self.weights)): - # Multiple inputs by weights outputs = np.dot(inputs, self.weights[i]) self.inputs_list.append(inputs) - # Transform outputs to get inputs for next layer (or final preds) - if i < len(self.weights) - 1: - inputs = self.activation(outputs) - else: - self.y_pred = self.output_activation(outputs) + inputs = self.activation(outputs) if i < len(self.weights) - 1 else self.output_activation(outputs) - # Evaluate loss function - fitness = self.loss(self.y_true, self.y_pred) + self.y_pred = inputs + return self.loss(self.y_true, self.y_pred) - return fitness - - def get_output_activation(self): - """Return the activation function for the output layer. + def get_output_activation(self) -> Callable: + """ + Return the activation function for the output layer. Returns ------- - self.output_activation: callable + Callable Activation function for the output layer. """ return self.output_activation - def get_prob_type(self): - """Return the problem type. + def get_prob_type(self) -> str: + """ + Return the problem type. Returns ------- - self.prob_type: str - Specifies problem type as 'discrete', 'continuous', 'tsp', or - 'either'. + str + Problem type as 'discrete', 'continuous', 'tsp', or 'either'. """ return self.prob_type - def calculate_updates(self): - """Calculate gradient descent updates. + def calculate_updates(self) -> list[np.ndarray]: + """ + Calculate gradient descent updates. Returns ------- - updates_list: list + list of np.ndarray List of back propagation weight updates. """ delta_list: list = [] - updates_list = [] + updates_list: list = [] - # Work backwards from final layer for i in range(len(self.inputs_list) - 1, -1, -1): - # Final layer if i == len(self.inputs_list) - 1: delta = self.y_pred - self.y_true - # Hidden layers else: dot = np.dot(delta_list[-1], np.transpose(self.weights[i + 1])) activation = self.activation(self.inputs_list[i + 1], deriv=True) delta = dot * activation delta_list.append(delta) - - # Calculate updates - updates = -1.0 * self.learning_rate * np.dot(np.transpose(self.inputs_list[i]), delta) - + updates = -self.learning_rate * np.dot(np.transpose(self.inputs_list[i]), delta) updates_list.append(updates) - # Reverse order of updates list - updates_list = updates_list[::-1] - - return updates_list + return updates_list[::-1]