From c0e8ad22d4ab68b860ed273d65518c77ec045fbe Mon Sep 17 00:00:00 2001 From: Guillaume Levasseur Date: Wed, 7 Oct 2020 10:12:30 +0200 Subject: [PATCH] refactor: split AFHMM+SAC disaggregate_thread function into the constraints setup and the actual minimization --- nilmtk_contrib/disaggregate/afhmm.py | 180 +++++++++----------- nilmtk_contrib/disaggregate/afhmm_sac.py | 205 ++++++++++------------- 2 files changed, 176 insertions(+), 209 deletions(-) diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index 63e58dd..7330aa5 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -64,114 +64,100 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): print ("{}: Finished training".format(self.MODEL_NAME)) - def disaggregate_thread(self, test_mains,index,d): - means_vector = self.means_vector - pi_s_vector = self.pi_s_vector - means_vector = self.means_vector - transmat_vector = self.transmat_vector - - sigma = 100*np.ones((len(test_mains),1)) - flag = 0 - - for epoch in range(6): - # The alernative minimization - if epoch%2==1: - usage = np.zeros((len(test_mains))) - for appliance_id in range(self.num_appliances): - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - usage+=app_usage - sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) - sigma = np.where(sigma<1,1,sigma) + def setup_cvx_constraints(self, n_samples, n_appliances): + cvx_state_vectors = [ + cvx.Variable( + shape=( n_samples, self.default_num_states ), + name="state_vec-{}".format(i) + ) + for i in range(n_appliances) + ] + constraints = [] + for stv in cvx_state_vectors: + # State vector values are ranged. + constraints += [ stv >= 0, stv <= 1 ] + # Sum of states equals 1. + for t in range(n_samples): + constraints.append(cvx.sum(stv[t]) == 1) + # Create variable matrices for each appliance, for each sample. + cvx_variable_matrices = [ + [ + cvx.Variable( + shape=( self.default_num_states, self.default_num_states ), + name="variable_matrix-{}-{}".format(i, t) + ) + for t in range(n_samples) + ] + for i in range(n_appliances) + ] + for i, appli_varmats in enumerate(cvx_variable_matrices): + for t, varmat in enumerate(appli_varmats): + # Assign range constraints to variable matrices. + constraints += [ varmat >= 0, varmat <= 1 ] + # Assign equality constraints with state vectors. + constraints += [ + cvx.sum(varmat[l]) == cvx_state_vectors[i][t-1][l] + for l in range(self.default_num_states) + ] + constraints += [ + cvx.sum((varmat.T)[l]) == cvx_state_vectors[i][t][l] + for l in range(self.default_num_states) + ] + + return cvx_state_vectors, constraints, cvx_variable_matrices + + def disaggregate_thread(self, test_mains): + n_epochs = 6 # don't put in __init__, those are inference epochs! + n_samples = len(test_mains) + sigma = 100*np.ones(( n_samples, 1 )) + cvx_state_vectors, constraints, cvx_varmats = self.setup_cvx_constraints( + n_samples, len(self.means_vector)) + # Preparing first terms of the objective function. + term_1 = 0 + term_2 = 0 + total_appli_energy = np.zeros_like(test_mains) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + total_appli_energy += cvx_state_vectors[i]@means + appli_varmats = cvx_varmats[i] + transmat = self.transmat_vector[appli_name] + for varmat in appli_varmats: + term_1 -= cvx.sum(cvx.multiply(varmat, np.log(transmat))) + + first_hot_state = cvx_state_vectors[i][0] + transition_p = self.pi_s_vector[appli_name] + term_2 -= cvx.sum(cvx.multiply(first_hot_state, np.log(transition_p))) + + for epoch in range(n_epochs): + if epoch % 2: + # Alernative minimization on odd epochs. + usage = np.zeros(( n_samples, )) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + usage += np.sum(s_[i]@means, axis=1) + sigma = (test_mains.flatten() - usage.flatten()).reshape(( -1, 1 )) + sigma = np.where(sigma < 1, 1, sigma) else: - if flag==0: - constraints = [] - cvx_state_vectors = [] - cvx_variable_matrices = [] - delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') - for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable( - shape=(len(test_mains), self.default_num_states), - name='state_vec-%s'%(appliance_id) - ) - cvx_state_vectors.append(state_vector) - # Enforcing that their values are ranged - constraints+=[cvx_state_vectors[appliance_id]>=0] - constraints+=[cvx_state_vectors[appliance_id]<=1] - # Enforcing that sum of states equals 1 - for t in range(len(test_mains)): # 6c - constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] - # Creating Variable matrices for every appliance - appliance_variable_matrix = [] - for t in range(len(test_mains)): - matrix = cvx.Variable( - shape=(self.default_num_states, self.default_num_states), - name='variable_matrix-%s-%d'%(appliance_id,t) - ) - appliance_variable_matrix.append(matrix) - cvx_variable_matrices.append(appliance_variable_matrix) - # Enforcing that their values are ranged - for t in range(len(test_mains)): - constraints+=[cvx_variable_matrices[appliance_id][t]>=0] - constraints+=[cvx_variable_matrices[appliance_id][t]<=1] - # Constraint 6e - for t in range(0,len(test_mains)): # 6e - for i in range(self.default_num_states): - constraints+=[cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i]] - # Constraint 6d - for t in range(1,len(test_mains)): # 6d - for i in range(self.default_num_states): - constraints+=[ - cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i] - ] - - total_observed_reading = np.zeros((test_mains.shape)) - # Total observed reading equals the sum of each appliance - for appliance_id in range(self.num_appliances): - total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] - - # Loss function to be minimized - term_1 = 0 - term_2 = 0 - for appliance_id in range(self.num_appliances): - # First loop is over appliances - variable_matrix = cvx_variable_matrices[appliance_id] - transmat = transmat_vector[appliance_id] - # Next loop is over different time-stamps - for matrix in variable_matrix: - term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat))) - one_hot_states = cvx_state_vectors[appliance_id] - pi = pi_s_vector[appliance_id] - # The expression involving start states - first_one_hot_states = one_hot_states[0] - term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - flag = 1 - - expression = 0 + # Primary minimization on even epochs. term_3 = 0 term_4 = 0 + for t in range(n_samples): + term_3 += .5 * (np.log(sigma[t]**2)) + term_4 += .5 * ((test_mains[t][0] - total_appli_energy[t][0])**2 / (sigma[t]**2)) - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) - - expression = term_1 + term_2 + term_3 + term_4 - expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints,) - prob.solve(solver=cvx.SCS,verbose=False,warm_start=True) + objective = cvx.Minimize(term_1 + term_2 + term_3 + term_4) + prob = cvx.Problem(objective, constraints) + prob.solve(solver=cvx.SCS, verbose=False, warm_start=True) s_ = [ - np.zeros((len(test_mains), self.default_num_states)) if i.value is None + np.zeros((n_samples, self.default_num_states)) if i.value is None else i.value for i in cvx_state_vectors ] prediction_dict = {} - for appliance_id in range(self.num_appliances): - app_name = self.appliances[appliance_id] - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - prediction_dict[app_name] = app_usage.flatten() + for i, (appli_name, means) in enumerate(self.means_vector.items()): + app_usage = np.sum(s_[i]@means, axis=1) + prediction_dict[appli_name] = app_usage.flatten() - # Store the result in the index corresponding to the thread. - d[index] = pd.DataFrame(prediction_dict,dtype='float32') + return pd.DataFrame(prediction_dict, dtype="float32") def disaggregate_chunk(self, test_mains): # Distributes the test mains across multiple threads and disaggregate in parallel diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 09c32a3..a3a66ce 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -66,126 +66,107 @@ def partial_fit(self, train_main, train_appliances, **load_kwargs): print ("{}: Finished training".format(self.MODEL_NAME)) - def disaggregate_thread(self, test_mains,index,d): - means_vector = self.means_vector - pi_s_vector = self.pi_s_vector - means_vector = self.means_vector - transmat_vector = self.transmat_vector - sigma = 100*np.ones((len(test_mains),1)) - flag = 0 - for epoch in range(6): - if epoch%2==1: - # The alernative Minimization - usage = np.zeros((len(test_mains))) - for appliance_id in range(self.num_appliances): - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - usage+=app_usage - sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) - sigma = np.where(sigma<1,1,sigma) + def setup_cvx_constraints(self, n_samples, n_appliances): + cvx_state_vectors = [ + cvx.Variable( + shape=( n_samples, self.default_num_states ), + name="state_vec-{}".format(i) + ) + for i in range(n_appliances) + ] + constraints = [] + for stv in cvx_state_vectors: + # State vector values are ranged. + constraints += [ stv >= 0, stv <= 1 ] + # Sum of states equals 1. + for t in range(n_samples): + constraints.append(cvx.sum(stv[t]) == 1) + + # Create variable matrices for each appliance, for each sample. + cvx_variable_matrices = [ + [ + cvx.Variable( + shape=( self.default_num_states, self.default_num_states ), + name="variable_matrix-{}-{}".format(i, t) + ) + for t in range(n_samples) + ] + for i in range(n_appliances) + ] + for i, appli_varmats in enumerate(cvx_variable_matrices): + for t, varmat in enumerate(appli_varmats): + # Assign range constraints to variable matrices. + constraints += [ varmat >= 0, varmat <= 1 ] + # Assign equality constraints with state vectors. + constraints += [ + cvx.sum(varmat[l]) == cvx_state_vectors[i][t-1][l] + for l in range(self.default_num_states) + ] + constraints += [ + cvx.sum((varmat.T)[l]) == cvx_state_vectors[i][t][l] + for l in range(self.default_num_states) + ] + + # Constraints on signal aggregates. + for i, (appli_name, signal_aggregate) in enumerate(self.signal_aggregates.items()): + appliance_usage = cvx_state_vectors[i]@self.means_vector[appli_name] + total_appliance_usage = cvx.sum(appliance_usage) + constraints.append(total_appliance_usage <= signal_aggregate) + + return cvx_state_vectors, constraints, cvx_variable_matrices + + def disaggregate_thread(self, test_mains): + n_epochs = 6 # don't put in __init__, those are inference epochs! + n_samples = len(test_mains) + sigma = 100*np.ones(( n_samples, 1 )) + cvx_state_vectors, constraints, cvx_varmats = self.setup_cvx_constraints( + n_samples, len(self.means_vector)) + # Preparing first terms of the objective function. + term_1 = 0 + term_2 = 0 + total_appli_energy = np.zeros_like(test_mains) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + total_appli_energy += cvx_state_vectors[i]@means + appli_varmats = cvx_varmats[i] + transmat = self.transmat_vector[appli_name] + for varmat in appli_varmats: + term_1 -= cvx.sum(cvx.multiply(varmat, np.log(transmat))) + + first_hot_state = cvx_state_vectors[i][0] + transition_p = self.pi_s_vector[appli_name] + term_2 -= cvx.sum(cvx.multiply(first_hot_state, np.log(transition_p))) + + for epoch in range(n_epochs): + if epoch % 2: + # Alernative minimization on odd epochs. + usage = np.zeros(( n_samples, )) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + usage += np.sum(s_[i]@means, axis=1) + sigma = (test_mains.flatten() - usage.flatten()).reshape(( -1, 1 )) + sigma = np.where(sigma < 1, 1, sigma) else: - if flag==0: - constraints = [] - cvx_state_vectors = [] - cvx_variable_matrices = [] - delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') - - for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable( - shape=(len(test_mains), - self.default_num_states), - name='state_vec-%s'%(appliance_id) - ) - cvx_state_vectors.append(state_vector) - # Enforcing that their values are ranged - constraints+=[cvx_state_vectors[appliance_id]>=0] - constraints+=[cvx_state_vectors[appliance_id]<=1] - # Enforcing that sum of states equals 1 - for t in range(len(test_mains)): # 6c - constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] - - # Creating Variable matrices for every appliance - appliance_variable_matrix = [] - for t in range(len(test_mains)): - matrix = cvx.Variable( - shape=(self.default_num_states, self.default_num_states), - name='variable_matrix-%s-%d'%(appliance_id,t) - ) - appliance_variable_matrix.append(matrix) - - cvx_variable_matrices.append(appliance_variable_matrix) - # Enforcing that their values are ranged - for t in range(len(test_mains)): - constraints+=[cvx_variable_matrices[appliance_id][t]>=0] - constraints+=[cvx_variable_matrices[appliance_id][t]<=1] - - # Constraint 6e - for t in range(0,len(test_mains)): # 6e - for i in range(self.default_num_states): - constraints += [ - cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i] - ] - - # Constraint 6d - for t in range(1,len(test_mains)): # 6d - for i in range(self.default_num_states): - constraints += [ - cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i] - ] - - for appliance_id in range(self.num_appliances): - appliance_usage = cvx_state_vectors[appliance_id]@means_vector[appliance_id] - total_appliance_usage = cvx.sum(appliance_usage) - constraints += [ - total_appliance_usage <= self.signal_aggregates[self.appliances[appliance_id]] - ] - - # Second order cone constraints - total_observed_reading = np.zeros((test_mains.shape)) - for appliance_id in range(self.num_appliances): - total_observed_reading += cvx_state_vectors[appliance_id]@means_vector[appliance_id] - flag=1 - term_1 = 0 - term_2 = 0 - - for appliance_id in range(self.num_appliances): - # First loop is over appliances - variable_matrix = cvx_variable_matrices[appliance_id] - transmat = transmat_vector[appliance_id] - # Next loop is over different time-stamps - for matrix in variable_matrix: - term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat))) - one_hot_states = cvx_state_vectors[appliance_id] - pi = pi_s_vector[appliance_id] - # The expression involving start states - first_one_hot_states = one_hot_states[0] - term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - - flag = 1 - - expression = 0 + # Primary minimization on even epochs. term_3 = 0 term_4 = 0 - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) - - expression = term_1 + term_2 + term_3 + term_4 - expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints) - prob.solve(solver=cvx.SCS,verbose=False, warm_start=True) + for t in range(n_samples): + term_3 += .5 * (np.log(sigma[t]**2)) + term_4 += .5 * ((test_mains[t][0] - total_appli_energy[t][0])**2 / (sigma[t]**2)) + + objective = cvx.Minimize(term_1 + term_2 + term_3 + term_4) + prob = cvx.Problem(objective, constraints) + prob.solve(solver=cvx.SCS, verbose=False, warm_start=True) s_ = [ - np.zeros((len(test_mains), self.default_num_states)) if i.value is None - else i.value - for i in cvx_state_vectors + np.zeros((n_samples, self.default_num_states)) if i.value is None + else i.value + for i in cvx_state_vectors ] prediction_dict = {} - for appliance_id in range(self.num_appliances): - app_name = self.appliances[appliance_id] - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - prediction_dict[app_name] = app_usage.flatten() + for i, (appli_name, means) in enumerate(self.means_vector.items()): + app_usage = np.sum(s_[i]@means, axis=1) + prediction_dict[appli_name] = app_usage.flatten() - d[index] = pd.DataFrame(prediction_dict,dtype='float32') + return pd.DataFrame(prediction_dict, dtype="float32") @nilmtk.docinherit.doc_inherit def disaggregate_chunk(self, test_mains):