-
Notifications
You must be signed in to change notification settings - Fork 0
/
matrix_factorization_utilities.py
124 lines (86 loc) · 4.26 KB
/
matrix_factorization_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
import numpy as np
from scipy.optimize import fmin_cg
# In[ ]:
def normalize_ratings(ratings):
"""
Given an array of user ratings, subtract the mean of each product's ratings
:param ratings: 2d array of user ratings
:return: (normalized ratings array, the calculated means)
"""
mean_ratings = np.nanmean(ratings, axis=0)
return ratings - mean_ratings, mean_ratings
# In[ ]:
def cost(X, *args):
"""
Cost function for low rank matrix factorization
:param X: The matrices being factored (P and Q) rolled up as a contiguous array
:param args: Array containing (num_users, num_products, num_features, ratings, mask, regularization_amount)
:return: The cost with the current P and Q matrices
"""
num_users, num_products, num_features, ratings, mask, regularization_amount = args
# Unroll P and Q
P = X[0:(num_users * num_features)].reshape(num_users, num_features)
Q = X[(num_users * num_features):].reshape(num_products, num_features)
Q = Q.T
# Calculate current cost
return (np.sum(np.square(mask * (np.dot(P, Q) - ratings))) / 2) + ((regularization_amount / 2.0) * np.sum(np.square(Q.T))) + ((regularization_amount / 2.0) * np.sum(np.square(P)))
# In[ ]:
def gradient(X, *args):
"""
Calculate the cost gradients with the current P and Q.
:param X: The matrices being factored (P and Q) rolled up as a contiguous array
:param args: Array containing (num_users, num_products, num_features, ratings, mask, regularization_amount)
:return: The gradient with the current X
"""
num_users, num_products, num_features, ratings, mask, regularization_amount = args
# Unroll P and Q
P = X[0:(num_users * num_features)].reshape(num_users, num_features)
Q = X[(num_users * num_features):].reshape(num_products, num_features)
Q = Q.T
# Calculate the current gradients for both P and Q
P_grad = np.dot((mask * (np.dot(P, Q) - ratings)), Q.T) + (regularization_amount * P)
Q_grad = np.dot((mask * (np.dot(P, Q) - ratings)).T, P) + (regularization_amount * Q.T)
# Return the gradients as one rolled-up array as expected by fmin_cg
return np.append(P_grad.ravel(), Q_grad.ravel())
# In[ ]:
def low_rank_matrix_factorization(ratings, mask=None, num_features=15, regularization_amount=0.01):
"""
Factor a ratings array into two latent feature arrays (user features and product features)
:param ratings: Matrix with user ratings to factor
:param mask: A binary mask of which ratings are present in the ratings array to factor
:param num_features: Number of latent features to generate for users and products
:param regularization_amount: How much regularization to apply
:return: (P, Q) - the factored latent feature arrays
"""
num_users, num_products = ratings.shape
# If no mask is provided, consider all 'NaN' elements as missing and create a mask.
if mask is None:
mask = np.invert(np.isnan(ratings))
# Replace NaN values with zero
ratings = np.nan_to_num(ratings)
# Create P and Q and fill with random numbers to start
np.random.seed(0)
P = np.random.randn(num_users, num_features)
Q = np.random.randn(num_products, num_features)
# Roll up P and Q into a contiguous array as fmin_cg expects
initial = np.append(P.ravel(), Q.ravel())
# Create an args array as fmin_cg expects
args = (num_users, num_products, num_features, ratings, mask, regularization_amount)
# Call fmin_cg to minimize the cost function and this find the best values for P and Q
X = fmin_cg(cost, initial, fprime=gradient, args=args, maxiter=3000)
# Unroll the new P and new Q arrays out of the contiguous array returned by fmin_cg
nP = X[0:(num_users * num_features)].reshape(num_users, num_features)
nQ = X[(num_users * num_features):].reshape(num_products, num_features)
return nP, nQ.T
# In[ ]:
def RMSE(real, predicted):
"""
Calculate the root mean squared error between a matrix of real ratings and predicted ratings
:param real: A matrix containing the real ratings (with 'NaN' for any missing elements)
:param predicted: A matrix of predictions
:return: The RMSE as a float
"""
return np.sqrt(np.nanmean(np.square(real - predicted)))