diff --git a/polara/datasets/movielens.py b/polara/datasets/movielens.py index 79882cf..f376179 100644 --- a/polara/datasets/movielens.py +++ b/polara/datasets/movielens.py @@ -1,4 +1,5 @@ from io import BytesIO +import numpy as np import pandas as pd try: @@ -8,9 +9,14 @@ def get_movielens_data(local_file=None, get_ratings=True, get_genres=False, - split_genres=True, mdb_mapping=False): + split_genres=True, mdb_mapping=False, get_tags=False, include_time=False): '''Downloads movielens data and stores it in pandas dataframe. ''' + fields = ['userid', 'movieid', 'rating'] + + if include_time: + fields.append('timestamp') + if not local_file: # downloading data from requests import get @@ -20,7 +26,7 @@ def get_movielens_data(local_file=None, get_ratings=True, get_genres=False, else: zip_contents = local_file - ml_data = ml_genres = mapping = None + ml_data = ml_genres = ml_tags = mapping = None # loading data into memory with ZipFile(zip_contents) as zfile: zip_files = pd.Series(zfile.namelist()) @@ -33,9 +39,7 @@ def get_movielens_data(local_file=None, get_ratings=True, get_genres=False, zdata = zdata.replace(b'::', delimiter.encode()) # makes data compatible with pandas c-engine # returns string objects instead of bytes in that case - ml_data = pd.read_csv(BytesIO(zdata), sep=delimiter, header=header, engine='c', - names=['userid', 'movieid', 'rating', 'timestamp'], - usecols=['userid', 'movieid', 'rating']) + ml_data = pd.read_csv(BytesIO(zdata), sep=delimiter, header=header, engine='c', names=fields, usecols=fields) if get_genres: zip_file = zip_files[zip_files.str.contains('movies')].iat[0] @@ -51,6 +55,19 @@ def get_movielens_data(local_file=None, get_ratings=True, get_genres=False, ml_genres = get_split_genres(genres_data) if split_genres else genres_data + if get_tags: + zip_file = zip_files[zip_files.str.contains('/tags')].iat[0] #not genome + zdata = zfile.read(zip_file) + if not is_new_format: + # make data compatible with pandas c-engine + # pandas returns string objects instead of bytes in that case + delimiter = '^' + zdata = zdata.replace(b'::', delimiter.encode()) + fields[2] = 'tag' + ml_tags = pd.read_csv(BytesIO(zdata), sep=delimiter, header=header, + engine='c', encoding='latin1', + names=fields, usecols=range(len(fields))) + if mdb_mapping and is_new_format: # imdb and tmdb mapping - exists only in ml-latest or 20m datasets zip_file = zip_files[zip_files.str.contains('links')].iat[0] @@ -58,7 +75,7 @@ def get_movielens_data(local_file=None, get_ratings=True, get_genres=False, mapping = pd.read_csv(zdata, sep=',', header=0, engine='c', names=['movieid', 'imdbid', 'tmdbid']) - res = [data for data in [ml_data, ml_genres, mapping] if data is not None] + res = [data for data in [ml_data, ml_genres, ml_tags, mapping] if data is not None] if len(res)==1: res = res[0] return res @@ -75,7 +92,7 @@ def filter_short_head(data, threshold=0.01): short_head.sort_values(ascending=False, inplace=True) ratings_perc = short_head.cumsum()*1.0/short_head.sum() - movies_perc = pd.np.arange(1, len(short_head)+1, dtype=pd.np.float64) / len(short_head) + movies_perc = np.arange(1, len(short_head)+1, dtype='f8') / len(short_head) long_tail_movies = ratings_perc[movies_perc > threshold].index return long_tail_movies diff --git a/polara/evaluation/evaluation_engine.py b/polara/evaluation/evaluation_engine.py index 482375b..d5476eb 100644 --- a/polara/evaluation/evaluation_engine.py +++ b/polara/evaluation/evaluation_engine.py @@ -5,9 +5,23 @@ except NameError: pass +from math import sqrt import pandas as pd +def sample_ci(df, coef=2.776, level=None): # 95% CI for sample under Student's t-test + # http://www.stat.yale.edu/Courses/1997-98/101/confint.htm + # example from http://onlinestatbook.com/2/estimation/mean.html + nlevels = df.index.nlevels + if (nlevels == 1) & (level is None): + n = df.shape[0] + elif (nlevels==2) & (level is not None): + n = df.index.levshape[1-level] + else: + raise ValueError + return coef * df.std(level=level, ddof=1) / sqrt(n) + + def save_scores(scores, dataset_name, experiment_name, save_folder=None): experiment_keys = scores.keys() save_folder = save_folder or 'results' @@ -49,9 +63,10 @@ def set_topk(models, topk): model.topk = topk -def build_models(models): +def build_models(models, force=True): for model in models: - model.build() + if not model._is_ready or force: + model.build() def consolidate(scores, params, metrics): @@ -90,14 +105,13 @@ def holdout_test_pair(model1, model2, holdout_sizes=[1], metrics=['hits']): return consolidate(holdout_scores, holdout_sizes, metrics) -def holdout_test(models, holdout_sizes=[1], metrics=['hits']): +def holdout_test(models, holdout_sizes=[1], metrics=['hits'], force_build=True): holdout_scores = [] data = models[0].data assert all([model.data is data for model in models[1:]]) #check that data is shared across models - build_models(models) + build_models(models, force_build) for i in holdout_sizes: - print(i, end=' ') data.holdout_size = i data.update() @@ -107,7 +121,7 @@ def holdout_test(models, holdout_sizes=[1], metrics=['hits']): return consolidate(holdout_scores, holdout_sizes, metrics) -def topk_test(models, topk_list=[10], metrics=['hits']): +def topk_test(models, topk_list=[10], metrics=['hits'], force_build=True): topk_scores = [] data = models[0].data assert all([model.data is data for model in models[1:]]) #check that data is shared across models @@ -115,9 +129,8 @@ def topk_test(models, topk_list=[10], metrics=['hits']): data.update() topk_list = list(reversed(sorted(topk_list))) #start from max topk and rollback - build_models(models) + build_models(models, force_build) for topk in topk_list: - print(topk, end=' ') metric_scores = evaluate_models(models, metrics, topk) topk_scores.append(metric_scores) diff --git a/polara/evaluation/pipelines.py b/polara/evaluation/pipelines.py new file mode 100644 index 0000000..99fcb5e --- /dev/null +++ b/polara/evaluation/pipelines.py @@ -0,0 +1,34 @@ +from __future__ import print_function + +from operator import mul as mul_op +from functools import reduce +from itertools import product +from random import choice + + +def random_chooser(): + while True: + values = yield + yield choice(values) + + +def random_grid(params, n=60, grid_cache=None): + if not isinstance(n, int): + raise TypeError('n must be an integer, not {}'.format(type(n))) + if n < 0: + raise ValueError('n should be >= 0') + + grid = grid_cache or set() + max_n = reduce(mul_op, [len(vals) for vals in params.values()]) + n = min(n if n > 0 else max_n, max_n) + param_chooser = random_chooser() + try: + while len(grid) < n: + level_choice = [] + for v in params.values(): + next(param_chooser) + level_choice.append(param_chooser.send(v)) + grid.add(tuple(level_choice)) + except KeyboardInterrupt: + print('Interrupted by user. Providing current results.') + return grid diff --git a/polara/lib/hosvd.py b/polara/lib/hosvd.py index ee4dae4..a9494ba 100644 --- a/polara/lib/hosvd.py +++ b/polara/lib/hosvd.py @@ -6,46 +6,40 @@ pass import numpy as np -from numba import jit - - -@jit(nopython=True, nogil=True) -def double_tensordot(idx, val, U, V, new_shape1, new_shape2, ten_mode0, ten_mode1, ten_mode2, res): - I = idx.shape[0] - J = new_shape1 - K = new_shape2 - for i in range(I): - i0 = idx[i, ten_mode0] - i1 = idx[i, ten_mode1] - i2 = idx[i, ten_mode2] - for j in range(J): - for k in range(K): - res[i0, j, k] += val[i] * U[i1, j] * V[i2, k] - - -def tensordot2(idx, val, shape, U, V, modes): - ten_mode1, mat_mode1 = modes[0] - ten_mode2, mat_mode2 = modes[1] - - ten_mode0, = [x for x in (0, 1, 2) if x not in (ten_mode1, ten_mode2)] - new_shape = (shape[ten_mode0], U.shape[1-mat_mode1], V.shape[1-mat_mode2]) - res = np.zeros(new_shape) - - if mat_mode1 == 1: - vU = U.T - else: - vU = U - - if mat_mode2 == 1: - vV = V.T - else: - vV = V - - double_tensordot(idx, val, vU, vV, new_shape[1], new_shape[2], ten_mode0, ten_mode1, ten_mode2, res) +from scipy.sparse.linalg import svds +from numba import njit + + +@njit(nogil=True) +def double_tensordot(idx, val, u, v, mode0, mode1, mode2, res): + new_shape1 = u.shape[1] + new_shape2 = v.shape[1] + for i in range(len(val)): + i0 = idx[i, mode0] + i1 = idx[i, mode1] + i2 = idx[i, mode2] + vi = val[i] + for j in range(new_shape1): + for k in range(new_shape2): + res[i0, j, k] += vi * u[i1, j] * v[i2, k] + + +def tensordot2(idx, val, shape, U, V, modes, dtype=None): + mode1, mat_mode1 = modes[0] + mode2, mat_mode2 = modes[1] + + u = U.T if mat_mode1 == 1 else U + v = V.T if mat_mode2 == 1 else V + + mode0, = [x for x in (0, 1, 2) if x not in (mode1, mode2)] + new_shape = (shape[mode0], U.shape[1-mat_mode1], V.shape[1-mat_mode2]) + + res = np.zeros(new_shape, dtype=dtype) + double_tensordot(idx, val, u, v, mode0, mode1, mode2, res) return res -def tucker_als(idx, val, shape, core_shape, iters=25, growth_tol=0.01, batch_run=False): +def tucker_als(idx, val, shape, core_shape, iters=25, growth_tol=0.01, batch_run=False, seed=None): ''' The function computes Tucker ALS decomposition of sparse tensor provided in COO format. Usage: @@ -55,45 +49,33 @@ def log_status(msg): if not batch_run: print(msg) - if not (idx.flags.c_contiguous and val.flags.c_contiguous): - raise ValueError('Warning! Imput arrays must be C-contigous.') - - - #TODO: it's better to implement check for future - #if np.any(idx[1:, 0]-idx[:-1, 0]) < 0): - # print('Warning! Index array must be sorted by first column in ascending order.') + random_state = np.random if seed is None else np.random.RandomState(seed) r0, r1, r2 = core_shape - - u1 = np.random.rand(shape[1], r1) + u1 = random_state.rand(shape[1], r1) u1 = np.linalg.qr(u1, mode='reduced')[0] - - u2 = np.random.rand(shape[2], r2) + u2 = random_state.rand(shape[2], r2) u2 = np.linalg.qr(u2, mode='reduced')[0] - u1 = np.ascontiguousarray(u1) - u2 = np.ascontiguousarray(u2) - g_norm_old = 0 - for i in range(iters): log_status('Step %i of %i' % (i+1, iters)) u0 = tensordot2(idx, val, shape, u2, u1, ((2, 0), (1, 0)))\ .reshape(shape[0], r1*r2) - uu = np.linalg.svd(u0, full_matrices=0)[0] - u0 = np.ascontiguousarray(uu[:, :r0]) + uu = svds(u0, k=r0, return_singular_vectors='u')[0] + u0 = np.ascontiguousarray(uu[:, ::-1]) u1 = tensordot2(idx, val, shape, u2, u0, ((2, 0), (0, 0)))\ .reshape(shape[1], r0*r2) - uu = np.linalg.svd(u1, full_matrices=0)[0] - u1 = np.ascontiguousarray(uu[:, :r1]) + uu = svds(u1, k=r1, return_singular_vectors='u')[0] + u1 = np.ascontiguousarray(uu[:, ::-1]) u2 = tensordot2(idx, val, shape, u1, u0, ((1, 0), (0, 0)))\ .reshape(shape[2], r0*r1) - uu, ss, vv = np.linalg.svd(u2, full_matrices=0) - u2 = np.ascontiguousarray(uu[:, :r2]) + uu, ss, vv = svds(u2, k=r2) + u2 = np.ascontiguousarray(uu[:, ::-1]) - g_norm_new = np.linalg.norm(ss[:r2]) + g_norm_new = np.linalg.norm(ss) g_growth = (g_norm_new - g_norm_old) / g_norm_new g_norm_old = g_norm_new log_status('growth of the core: %f' % g_growth) @@ -101,7 +83,7 @@ def log_status(msg): log_status('Core is no longer growing. Norm of the core: %f' % g_norm_old) break - g = ss[:r2, np.newaxis] * vv[:r2, :] + g = np.ascontiguousarray((ss[:, np.newaxis] * vv)[::-1, :]) g = g.reshape(r2, r1, r0).transpose(2, 1, 0) log_status('Done') return u0, u1, u2, g diff --git a/polara/lib/optimize.py b/polara/lib/optimize.py index 4dcbca8..7942f4b 100644 --- a/polara/lib/optimize.py +++ b/polara/lib/optimize.py @@ -1,7 +1,7 @@ import numpy as np -from numba import jit +from numba import njit -@jit(nopython=True, nogil=True) +@njit(nogil=True) def sgd_step(users_idx, items_idx, feedbacks, P, Q, eta, lambd): cum_error = 0 for k, a in enumerate(feedbacks): @@ -22,7 +22,7 @@ def sgd_step(users_idx, items_idx, feedbacks, P, Q, eta, lambd): cum_error += e*e return cum_error -@jit(nopython=True, nogil=True) +@njit(nogil=True) def sgd_step_biased(users_idx, items_idx, feedbacks, P, Q, b_user, b_item, mu, eta, lambd): cum_error = 0 for k, a in enumerate(feedbacks): diff --git a/polara/lib/sparse.py b/polara/lib/sparse.py index 11bde63..227a5fd 100644 --- a/polara/lib/sparse.py +++ b/polara/lib/sparse.py @@ -6,13 +6,15 @@ import numpy as np from scipy.sparse import csr_matrix -from numba import jit +from numba import njit, guvectorize +from numba import float64 as f8 +from numba import intp as ip + +from polara.recommender import defaults # matvec implementation is based on # http://stackoverflow.com/questions/18595981/improving-performance-of-multiplication-of-scipy-sparse-matrices - - -@jit(nopython=True, nogil=True) +@njit(nogil=True) def matvec2dense(m_ptr, m_ind, m_val, v_nnz, v_val, out): l = len(v_nnz) for j in range(l): @@ -24,7 +26,7 @@ def matvec2dense(m_ptr, m_ind, m_val, v_nnz, v_val, out): out[m_ind[ind_start:ind_end]] += m_val[ind_start:ind_end] * v_val[j] -@jit(nopython=True, nogil=True) +@njit(nogil=True) def matvec2sparse(m_ptr, m_ind, m_val, v_nnz, v_val, sizes, indices, data): l = len(sizes) - 1 for j in range(l): @@ -64,7 +66,7 @@ def csc_matvec(mat_csc, vec, dense_output=True, dtype=None): return res -@jit(nopython=True) +@njit def _blockify(ind, ptr, major_dim): # convenient function to compute only diagonal # elements of the product of 2 matrices; @@ -96,3 +98,28 @@ def inverse_permutation(p): s = np.empty(p.size, p.dtype) s[p] = np.arange(p.size) return s + + +def unfold_tensor_coordinates(index, shape, mode): + # TODO implement direct calculation w/o intermediate flattening + modes = [m for m in [0, 1, 2] if m != mode] + [mode,] + mode_shape = tuple(shape[m] for m in modes) + mode_index = tuple(index[m] for m in modes) + flat_index = np.ravel_multi_index(mode_index, mode_shape) + + unfold_shape = (mode_shape[0]*mode_shape[1], mode_shape[2]) + unfold_index = np.unravel_index(flat_index, unfold_shape) + return unfold_index, unfold_shape + + +def tensor_outer_at(vtarget, **kwargs): + @guvectorize(['void(float64[:], float64[:, :], float64[:, :], intp[:], intp[:], float64[:, :])'], + '(),(i,m),(j,n),(),()->(m,n)', + target=vtarget, nopython=True, **kwargs) + def tensor_outer_wrapped(val, v, w, i, j, res): + r1 = v.shape[1] + r2 = w.shape[1] + for m in range(r1): + for n in range(r2): + res[m, n] = val[0] * v[i[0], m] * w[j[0], n] + return tensor_outer_wrapped diff --git a/polara/recommender/coldstart/models.py b/polara/recommender/coldstart/models.py index 82b72c2..a2b3f6a 100644 --- a/polara/recommender/coldstart/models.py +++ b/polara/recommender/coldstart/models.py @@ -6,8 +6,8 @@ class ContentBasedColdStart(RecommenderModel): def __init__(self, *args, **kwargs): super(ContentBasedColdStart, self).__init__(*args, **kwargs) self.method = 'CB' - self._key = '{}_cold'.format(self.data.fields.itemid) - self._target = self.data.fields.userid + self._prediction_key = '{}_cold'.format(self.data.fields.itemid) + self._prediction_target = self.data.fields.userid def build(self): pass diff --git a/polara/recommender/data.py b/polara/recommender/data.py index d33076a..c026498 100644 --- a/polara/recommender/data.py +++ b/polara/recommender/data.py @@ -6,6 +6,7 @@ from collections import defaultdict import pandas as pd import numpy as np +from polara.lib.sparse import inverse_permutation from polara.recommender import defaults @@ -19,6 +20,16 @@ def random_sample(df, frac, random_state): return df.sample(frac=frac, random_state=random_state) +def group_largest_fraction(data, frac, groupid, by): + def return_order(a): + return np.take(range(1, len(a)+1), inverse_permutation(np.argsort(a))) + + grouper = data.groupby(groupid, sort=False)[by] + ordered = grouper.transform(return_order) + largest = ordered.groupby(data[groupid], sort=False).transform(lambda x: x>round(frac*x.shape[0])) + return largest + + class EventNotifier(object): def __init__(self, events=None): self._subscribers = {} @@ -91,17 +102,17 @@ class RecommenderData(object): '_warm_start', '_holdout_size', '_test_sample', '_permute_tops', '_random_holdout', '_negative_prediction'} - def __init__(self, data, userid, itemid, feedback, custom_order=None, seed=None): + def __init__(self, data, userid, itemid, feedback=None, custom_order=None, seed=None): self.name = None fields = [userid, itemid, feedback] if data is None: - cols = fields + [custom_order] if custom_order else fields - self._data = data = pd.DataFrame(columns=cols) + cols = fields + [custom_order] + self._data = data = pd.DataFrame(columns=[c for c in cols if c]) else: self._data = data - if data.duplicated(subset=fields).any(): + if data.duplicated(subset=[f for f in fields if f]).any(): # unstable in pandas v. 17.0, only works in <> v.17.0 # rely on deduplicated data in many places - makes data processing more efficient raise NotImplementedError('Data has duplicate values') @@ -208,7 +219,8 @@ def prepare(self): self._try_sort_test_data() if self.verbose: - print('Done.') + stats_msg = 'Done.\nThere are {} events in the training and {} events in the holdout.' + print(stats_msg.format(self.training.shape[0], self.test.holdout.shape[0])) def prepare_training_only(self): self.holdout_size = 0 # do not form holdout @@ -384,8 +396,16 @@ def _split_data(self): if self._holdout_size >= 1: # state 2, sample holdout data per each user holdout = self._sample_holdout(test_split) elif self._holdout_size > 0: # state 2, special case - sample whole data at once - random_state = np.random.RandomState(self.seed) - holdout = self._data.sample(frac=self._holdout_size, random_state=random_state) + if self._random_holdout: + random_state = np.random.RandomState(self.seed) + holdout = self._data.sample(frac=self._holdout_size, random_state=random_state) + else: + # TODO custom groupid support, not only userid + group_id = self.fields.userid + order_id = self._custom_order or self.fields.feedback + frac = self._holdout_size + largest = group_largest_fraction(self._data, frac, group_id, order_id) + holdout = self._data.loc[largest].copy() else: # state 1 holdout = None @@ -396,7 +416,10 @@ def _split_data(self): self._test = namedtuple('TestData', 'testset holdout')._make([testset, holdout]) if full_update: - self._training = self._data.loc[train_split, list(self.fields)] + fields = [f for f in list(self.fields) if f is not None] + if self._custom_order: + fields.append(self._custom_order) + self._training = self._data.loc[train_split, fields] self._notify(self.on_change_event) elif test_update: self._notify(self.on_update_event) @@ -679,7 +702,11 @@ def _sample_holdout(self, test_split, group_id=None): if self._holdout_size >= 1: # pick at most _holdout_size elements holdout = grouper.nlargest(self._holdout_size, keep='last') else: - raise NotImplementedError + frac = self._holdout_size + def sample_largest(x): + size = round(frac*len(x)) + return x.iloc[np.argpartition(x, -size)[-size:]] + holdout = grouper.apply(sample_largest) holdout_index = holdout.index.get_level_values(1) return self._data.loc[holdout_index] @@ -716,11 +743,13 @@ def to_coo(self, tensor_mode=False): self.index = self.index._replace(feedback=feedback_transform) idx = np.hstack((user_item_data, new_feedback[:, np.newaxis])) - idx = np.ascontiguousarray(idx) val = np.ones(self.training.shape[0],) else: idx = user_item_data - val = self.training[feedback].values + if feedback is None: + val = np.ones(self.training.shape[0],) + else: + val = self.training[feedback].values shp = tuple(idx.max(axis=0) + 1) idx = idx.astype(np.intp) @@ -735,8 +764,9 @@ def _recover_testset(self, update_data=False): if self.index.userid.training.new.isin(test_users).all(): testset = self.training else: - testset = (self.training.query('{} in @test_users'.format(userid)) - .sort_values(userid)) + testset = self.training.query('{} in @test_users'.format(userid)) + + testset = testset.sort_values(userid) if update_data: self._test = self._test._replace(testset=testset) return testset @@ -754,16 +784,19 @@ def test_to_coo(self, tensor_mode=False): user_idx = testset[userid].values.astype(np.intp) item_idx = testset[itemid].values.astype(np.intp) - fdbk_val = testset[feedback].values if tensor_mode: - fdbk_idx = self.index.feedback.set_index('old').loc[fdbk_val, 'new'].values - if np.isnan(fdbk_idx).any(): + fdbk_val = testset[feedback] + fdbk_idx = fdbk_val.map(self.index.feedback.set_index('old').new) + if fdbk_idx.isnull().any(): raise NotImplementedError('Not all values of feedback are present in training data') - else: - fdbk_idx = fdbk_idx.astype(np.intp) + fdbk_idx = fdbk_idx.values.astype(np.intp) test_coo = (user_idx, item_idx, fdbk_idx) else: + if feedback is None: + fdbk_val = np.ones(testset.shape[0],) + else: + fdbk_val = testset[feedback].values test_coo = (user_idx, item_idx, fdbk_val) return test_coo @@ -792,7 +825,7 @@ def get_test_shape(self, tensor_mode=False): def set_test_data(self, testset=None, holdout=None, warm_start=False, test_users=None, - reindex=True, ensure_consistency=True, copy=True): + reindex=True, ensure_consistency=True, holdout_size=None, copy=True): '''Should be used only with custom data.''' if warm_start and ((testset is None) and (test_users is None)): raise ValueError('When warm_start is True, information about test users must be present. ' @@ -810,7 +843,10 @@ def set_test_data(self, testset=None, holdout=None, warm_start=False, test_users holdout = holdout.copy() if holdout is not None else None if test_users is not None: - testset = self._data.loc[lambda x: x[self.fields.userid].isin(test_users), list(self.fields)] + fields = [f for f in list(self.fields) if f is not None] + if self._custom_order: + fields.append(self._custom_order) + testset = self._data.loc[lambda x: x[self.fields.userid].isin(test_users), fields] self._test = namedtuple('TestData', 'testset holdout')._make([testset, holdout]) self.index = self.index._replace(userid=self.index.userid._replace(test=None)) @@ -819,7 +855,7 @@ def set_test_data(self, testset=None, holdout=None, warm_start=False, test_users self._state = None self._last_update_rule = None self._test_ratio = -1 - self._holdout_size = -1 + self._holdout_size = holdout_size or -1 self._notify(self.on_update_event) self._change_properties.clear() diff --git a/polara/recommender/defaults.py b/polara/recommender/defaults.py index b712fbc..161d728 100644 --- a/polara/recommender/defaults.py +++ b/polara/recommender/defaults.py @@ -25,6 +25,14 @@ num_iters = 25 show_output = False flattener = slice(0, None) +test_vectorize_target = 'parallel' +# from https://numba.pydata.org/numba-doc/dev/user/vectorize.html +#The “cpu” target works well for small data sizes (approx. less than 1KB) and +#low compute intensity algorithms. It has the least amount of overhead. +#The “parallel” target works well for medium data sizes (approx. less than 1MB). +#Threading adds a small delay. The “cuda” target works well for big data sizes +#(approx. greater than 1MB) and high compute intensity algorithms. +#Transfering memory to and from the GPU adds significant overhead. #RECOMMENDATIONS @@ -35,9 +43,9 @@ #EVALUATION ndcg_alternative = True #use exponential instead of linear relevance contribution in nDCG - #COMPUTATION test_chunk_size = 1000 #to split tensor decompositions into smaller pieces in memory +max_test_workers = None # to compute recommendations in parallel for groups of test users def get_config(params): diff --git a/polara/recommender/external/implicit/ialswrapper.py b/polara/recommender/external/implicit/ialswrapper.py index b728317..cb8ae9b 100644 --- a/polara/recommender/external/implicit/ialswrapper.py +++ b/polara/recommender/external/implicit/ialswrapper.py @@ -18,6 +18,7 @@ def __init__(self, *args, **kwargs): self.alpha = 1 self.weight_func = np.log2 self.regularization = 0.01 + self.num_threads = 0 self.num_epochs = 15 self.method = 'iALS' self._model = None @@ -45,7 +46,8 @@ def build(self): # define iALS model instance self._model = implicit.als.AlternatingLeastSquares(factors=self.rank, regularization=self.regularization, - iterations=self.num_epochs) + iterations=self.num_epochs, + num_threads=self.num_threads) # prepare input matrix for learning the model matrix = self.get_training_matrix() # user_by_item sparse matrix @@ -60,25 +62,27 @@ def build(self): def get_recommendations(self): recalculate = self.data.warm_start # used to force folding-in computation if recalculate: + if self.filter_seen is False: + raise ValueError('The model always filters seen items from results.') # prepare test matrix with preferences of unseen users matrix, _ = self.get_test_matrix() matrix.data = self.confidence(matrix.data, alpha=self.alpha, weight=self.weight_func) num_users = matrix.shape[0] users_idx = range(num_users) + + top_recs = np.empty((num_users, self.topk), dtype=np.intp) + for i, user_row in enumerate(users_idx): + recs = self._model.recommend(user_row, matrix, N=self.topk, recalculate_user=recalculate) + top_recs[i, :] = [item for item, _ in recs] else: - # prepare traing matrix and convert test user indices into - # corresponding training matrix rows - matrix = self.get_training_matrix() - userid = self.data.fields.userid - users_idx = self.data.test.holdout[userid].drop_duplicates(keep='first').values - num_users = len(users_idx) + top_recs = super(ImplicitALS, self).get_recommendations() + return top_recs - top_recs = np.empty((num_users, self.topk), dtype=np.intp) - if self.filter_seen is False: - raise ValueError('The model always filters seen items from results.') + def slice_recommendations(self, test_data, shape, start, stop, test_users=None): + slice_data = self._slice_test_data(test_data, start, stop) - for i, user_row in enumerate(users_idx): - recs = self._model.recommend(user_row, matrix, N=self.topk, recalculate_user=recalculate) - top_recs[i, :] = [item for item, _ in recs] - return top_recs + user_factors = self._model.user_factors[test_users[start:stop], :] + item_factors = self._model.item_factors + scores = user_factors.dot(item_factors.T) + return scores, slice_data diff --git a/polara/recommender/models.py b/polara/recommender/models.py index 1540247..c2e47ab 100644 --- a/polara/recommender/models.py +++ b/polara/recommender/models.py @@ -9,6 +9,9 @@ from collections import namedtuple import warnings +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed + import pandas as pd import numpy as np import scipy as sp @@ -19,12 +22,12 @@ from polara.recommender.evaluation import get_hits, get_relevance_scores, get_ranking_scores from polara.recommender.evaluation import get_hr_score, get_mrr_score from polara.recommender.evaluation import assemble_scoring_matrices -from polara.recommender.utils import array_split, NNZ_MAX +from polara.recommender.utils import array_split, get_nnz_max from polara.lib.hosvd import tucker_als -from polara.lib.sparse import csc_matvec +from polara.lib.sparse import csc_matvec, inverse_permutation +from polara.lib.sparse import unfold_tensor_coordinates, tensor_outer_at from polara.tools.timing import Timer - def get_default(name): return defaults.get_config([name])[name] @@ -84,10 +87,11 @@ def __init__(self, recommender_data, switch_positive=None): # (in contrast to `on_feedback_level` argument of self.evaluate) self.switch_positive = switch_positive or get_default('switch_positive') self.verify_integrity = get_default('verify_integrity') + self.max_test_workers = get_default('max_test_workers') - # TODO sorting in data must be by self._key, also need to change get_test_data - self._key = self.data.fields.userid - self._target = self.data.fields.itemid + # TODO sorting in data must be by self._predition_key, also need to change get_test_data + self._predition_key = self.data.fields.userid + self._prediction_target = self.data.fields.itemid self._is_ready = False self.verbose = True @@ -168,7 +172,7 @@ def _get_slices_idx(self, shape, result_width=None, scores_multiplier=None, dtyp if scores_multiplier is None: try: fdbk_dim = self.factors.get(self.data.fields.feedback, None).shape - scores_multiplier = fdbk_dim[0] + 2*fdbk_dim[1] + scores_multiplier = fdbk_dim[1] except AttributeError: scores_multiplier = 1 @@ -186,7 +190,7 @@ def _get_test_data(self): test_shape = self.data.get_test_shape(tensor_mode=tensor_mode) idx_diff = np.diff(user_idx) - # TODO sorting by self._key + # TODO sorting by self._predition_key assert (idx_diff >= 0).all() # calculations assume testset is sorted by users! # TODO only required when testset consists of known users @@ -221,7 +225,14 @@ def slice_recommendations(self, test_data, shape, start, stop, test_users=None): def _user_scores(self, i): # should not be exposed, designed for use within framework # operates on internal itemid's - test_data, test_shape, _ = self._get_test_data() + if not self._is_ready: + if self.verbose: + print('{} model is not ready. Rebuilding.'.format(self.method)) + self.build() + + test_data, test_shape, test_users = self._get_test_data() + if not self.data.warm_start: + i, = np.where(test_users == i)[0] scores, seen_idx = self.slice_recommendations(test_data, test_shape, i, i+1) if self.filter_seen: @@ -240,8 +251,10 @@ def _make_user(self, user_info): items_data, feedback_data = zip(*user_info) except TypeError: items_data = user_info - feedback_val = self.data.training[feedback].max() - feedback_data = [feedback_val]*len(items_data) + feedback_data = {} + if feedback is not None: + feedback_val = self.data.training[feedback].max() + feedback_data = {feedback: [feedback_val]*len(items_data)} try: item_index = self.data.index.itemid.training @@ -251,10 +264,9 @@ def _make_user(self, user_info): # need to convert itemid's to internal representation # conversion is not required for feedback (it's made in *to_coo functions, if needed) items_data = item_index.set_index('old').loc[items_data, 'new'].values - user_data = pd.DataFrame({userid: [0]*len(items_data), - itemid: items_data, - feedback: feedback_data}) - return user_data + user_data = {userid: [0]*len(items_data), itemid: items_data} + user_data.update(feedback_data) + return pd.DataFrame(user_data) def show_recommendations(self, user_info, topk=None): @@ -293,37 +305,56 @@ def show_recommendations(self, user_info, topk=None): return top_recs, seen_items + def _slice_recommender(self, user_slice, test_data, test_shape, test_users): + start, stop = user_slice + scores, slice_data = self.slice_recommendations(test_data, test_shape, start, stop, test_users) + if self.filter_seen: + # prevent seen items from appearing in recommendations + # NOTE: in case of sparse models (e.g. simple item-to-item) + # there's a risk of having seen items in recommendations list + # (for topk < i2i_matrix.shape[1]-len(unseen)) + # this is related to low generalization ability + # of the naive cooccurrence method itself, not to the algorithm + self.downvote_seen_items(scores, slice_data) + top_recs = self.get_topk_elements(scores) + return top_recs + + + def run_parallel_recommender(self, result, user_slices, *args): + with ThreadPoolExecutor(max_workers=self.max_test_workers) as executor: + recs_futures = {executor.submit(self._slice_recommender, + user_slice, *args): user_slice + for user_slice in user_slices} + + for future in as_completed(recs_futures): + start, stop = recs_futures[future] + result[start:stop, :] = future.result() + + + def run_sequential_recommender(self, result, user_slices, *args): + for user_slice in user_slices: + start, stop = user_slice + result[start:stop, :] = self._slice_recommender(user_slice, *args) + + def get_recommendations(self): if self.verify_integrity: self.verify_data_integrity() - test_data, test_shape, test_users = self._get_test_data() - - topk = self.topk - top_recs = np.empty((test_shape[0], topk), dtype=np.int64) - - user_slices = self._get_slices_idx(test_shape) - start = user_slices[0] - for i in user_slices[1:]: - stop = i - scores, slice_data = self.slice_recommendations(test_data, test_shape, start, stop, test_users) - - if self.filter_seen: - # prevent seen items from appearing in recommendations - # NOTE: in case of sparse models (e.g. simple item-to-item) - # there's a risk of having seen items in recommendations list - # (for topk < i2i_matrix.shape[1]-len(unseen)) - # this is related to low generalization ability - # of the naive cooccurrence method itself, not to the algorithm - self.downvote_seen_items(scores, slice_data) - - top_recs[start:stop, :] = self.get_topk_elements(scores) - start = stop + test_data = self._get_test_data() + test_shape = test_data[1] + user_slices_idx = self._get_slices_idx(test_shape) + user_slices = zip(user_slices_idx[:-1], user_slices_idx[1:]) + top_recs = np.empty((test_shape[0], self.topk), dtype=np.int64) + if self.max_test_workers and len(user_slices_idx) > 2: + self.run_parallel_recommender(top_recs, user_slices, *test_data) + else: + self.run_sequential_recommender(top_recs, user_slices, *test_data) return top_recs - def evaluate(self, method='hits', topk=None, not_rated_penalty=None, on_feedback_level=None): + def evaluate(self, method='hits', topk=None, not_rated_penalty=None, on_feedback_level=None, ignore_feedback=False, simple_rates=False): feedback = self.data.fields.feedback if int(topk or 0) > self.topk: self.topk = topk # will also flush old recommendations @@ -332,7 +363,7 @@ def evaluate(self, method='hits', topk=None, not_rated_penalty=None, on_feedback recommendations = self.recommendations[:, :topk] # will recalculate if empty eval_data = self.data.test.holdout - if self.switch_positive is None: + if (self.switch_positive is None) or (feedback is None): # all recommendations are considered positive predictions # this is a proper setting for binary data problems (implicit feedback) # in this case all unrated items, recommended by an algorithm @@ -348,17 +379,18 @@ def evaluate(self, method='hits', topk=None, not_rated_penalty=None, on_feedback not_rated_penalty = not_rated_penalty or 0 is_positive = (eval_data[feedback] >= self.switch_positive).values + feedback = None if ignore_feedback else feedback scoring_data = assemble_scoring_matrices(recommendations, eval_data, - self._key, self._target, + self._predition_key, self._prediction_target, is_positive, feedback=feedback) if method == 'relevance': # no need for feedback - if self.data.holdout_size == 1: + if (self.data.holdout_size == 1) or simple_rates: scores = get_hr_score(scoring_data[1]) else: scores = get_relevance_scores(*scoring_data, not_rated_penalty=not_rated_penalty) elif method == 'ranking': - if self.data.holdout_size == 1: + if (self.data.holdout_size == 1) or simple_rates: scores = get_mrr_score(scoring_data[1]) else: ndcg_alternative = get_default('ndcg_alternative') @@ -615,7 +647,7 @@ def _sparse_dot(self, tst_mat, i2i_mat): scores = tst_mat.dot(i2i_mat.T) # NOTE even though not neccessary for symmetric i2i matrix, # transpose helps to avoid expensive conversion to CSR (performed by scipy) - if scores.nnz > NNZ_MAX: + if scores.nnz > get_nnz_max(): # too many nnz lead to undesired memory overhead in downvote_seen_items scores = scores.toarray(order='C') return scores @@ -662,6 +694,7 @@ def _check_reduced_rank(self, rank): self.factors = dict.fromkeys(self.factors.keys()) break else: + # ellipsis allows to handle 1d array of singular values self.factors[entity] = factor[..., :rank] @@ -698,7 +731,7 @@ class CoffeeModel(RecommenderModel): def __init__(self, *args, **kwargs): super(CoffeeModel, self).__init__(*args, **kwargs) - self.mlrank = defaults.mlrank + self._mlrank = defaults.mlrank self.factors = {} self.chunk = defaults.test_chunk_size self.method = 'CoFFee' @@ -706,7 +739,20 @@ def __init__(self, *args, **kwargs): self.growth_tol = defaults.growth_tol self.num_iters = defaults.num_iters self.show_output = defaults.show_output + self.seed = None + self._vectorize_target = defaults.test_vectorize_target + + + @property + def mlrank(self): + return self._mlrank + @mlrank.setter + def mlrank(self, new_value): + if new_value != self._mlrank: + self._mlrank = new_value + self._check_reduced_rank(new_value) + self._recommendations = None @property def flattener(self): @@ -719,6 +765,46 @@ def flattener(self, new_value): self._flattener = new_value self._recommendations = None + @property + def tensor_outer_at(self): + vtarget = self._vectorize_target.lower() + if self.max_test_workers and (vtarget == 'parallel'): + # force single thread for tensor_outer_at to safely run in parallel + vtarget = 'cpu' + return tensor_outer_at(vtarget) + + + def _check_reduced_rank(self, mlrank): + for mode, entity in enumerate(self.data.fields): + factor = self.factors.get(entity, None) + if factor is None: + continue + + rank = mlrank[mode] + if factor.shape[1] < rank: + self._is_ready = False + self.factors = {} + break + elif factor.shape[1] == rank: + continue + else: + rfactor, new_core = self.round_core(self.factors['core'], mode, rank) + self.factors[entity] = factor.dot(rfactor) + self.factors['core'] = new_core + + + @staticmethod + def round_core(core, mode, rank): + new_dims = [mode] + [m for m in range(core.ndim) if m!=mode] + mode_dim = core.shape[mode] + flat_core = core.transpose(new_dims).reshape((mode_dim, -1), order='F') + u, s, vt = np.linalg.svd(flat_core, full_matrices=False) + rfactor = u[:, :rank] + new_core = (np.ascontiguousarray(s[:rank, np.newaxis]*vt[:rank, :]) + .reshape(rank, *[core.shape[i] for i in new_dims[1:]], order='F') + .transpose(inverse_permutation(np.array(new_dims)))) + return rfactor, new_core + @staticmethod def flatten_scores(tensor_scores, flattener=None): @@ -726,19 +812,19 @@ def flatten_scores(tensor_scores, flattener=None): if isinstance(flattener, str): slicer = slice(None) flatten = getattr(np, flattener) - matrix_scores = flatten(tensor_scores[:, :, slicer], axis=-1) + matrix_scores = flatten(tensor_scores[..., slicer], axis=-1) elif isinstance(flattener, int): slicer = flattener - matrix_scores = tensor_scores[:, :, slicer] + matrix_scores = tensor_scores[..., slicer] elif isinstance(flattener, (list, slice)): slicer = flattener flatten = np.sum - matrix_scores = flatten(tensor_scores[:, :, slicer], axis=-1) + matrix_scores = flatten(tensor_scores[..., slicer], axis=-1) elif isinstance(flattener, tuple): slicer, flatten_method = flattener slicer = slicer or slice(None) flatten = getattr(np, flatten_method) - matrix_scores = flatten(tensor_scores[:, :, slicer], axis=-1) + matrix_scores = flatten(tensor_scores[..., slicer], axis=-1) elif callable(flattener): matrix_scores = flattener(tensor_scores) else: @@ -754,7 +840,8 @@ def build(self): tucker_als(idx, val, shp, self.mlrank, growth_tol=self.growth_tol, iters=self.num_iters, - batch_run=not self.show_output) + batch_run=not self.show_output, + seed=self.seed) self.factors[self.data.fields.userid] = users_factors self.factors[self.data.fields.itemid] = items_factors @@ -762,36 +849,33 @@ def build(self): self.factors['core'] = core - def get_test_tensor(self, test_data, shape, start, end): - slice_idx = self._slice_test_data(test_data, start, end) + def unfold_test_tensor_slice(self, test_data, shape, start, stop, mode): + slice_idx = self._slice_test_data(test_data, start, stop) - num_users = end - start + num_users = stop - start num_items = shape[1] num_fdbks = shape[2] slice_shp = (num_users, num_items, num_fdbks) - idx_flat = np.ravel_multi_index(slice_idx, slice_shp) - shp_flat = (num_users*num_items, num_fdbks) - idx = np.unravel_index(idx_flat, shp_flat) - val = np.ones_like(slice_idx[2]) + idx, shp = unfold_tensor_coordinates(slice_idx, slice_shp, mode) + val = np.ones_like(slice_idx[2], dtype=np.uint8) - test_tensor_unfolded = csr_matrix((val, idx), shape=shp_flat, dtype=val.dtype) + test_tensor_unfolded = csr_matrix((val, idx), shape=shp, dtype=val.dtype) return test_tensor_unfolded, slice_idx def slice_recommendations(self, test_data, shape, start, stop, test_users=None): - test_tensor_unfolded, slice_idx = self.get_test_tensor(test_data, shape, start, stop) + slice_idx = self._slice_test_data(test_data, start, stop) + v = self.factors[self.data.fields.itemid] w = self.factors[self.data.fields.feedback] - num_users = stop - start - num_items = shape[1] + # use the fact that test data is sorted by users for reduction: + scores = self.tensor_outer_at(1.0, v, w, slice_idx[1], slice_idx[2]) + scores = np.add.reduceat(scores, np.r_[0, np.where(np.diff(slice_idx[0]))[0]+1]) - # assume that w.shape[1] < v.shape[1] (allows for more efficient calculations) - scores = test_tensor_unfolded.dot(w).reshape(num_users, num_items, w.shape[1]) - scores = np.tensordot(scores, v, axes=(1, 0)) - scores = np.tensordot(np.tensordot(scores, v, axes=(2, 1)), w, axes=(1, 1)) - scores = self.flatten_scores(scores, self.flattener) + wt_flat = self.flatten_scores(w.T, self.flattener) # TODO cache result + scores = np.tensordot(scores, wt_flat, axes=(2, 0)).dot(v.T) return scores, slice_idx # additional functionality: rating pediction diff --git a/polara/recommender/utils.py b/polara/recommender/utils.py index a9805e9..c403048 100644 --- a/polara/recommender/utils.py +++ b/polara/recommender/utils.py @@ -16,7 +16,8 @@ # size of list of tuples of indices - to estimate when to convert sparse matrix to dense # based on http://stackoverflow.com/questions/15641344/python-memory-consumption-dict-vs-list-of-tuples # and https://code.tutsplus.com/tutorials/understand-how-much-memory-your-python-objects-use--cms-25609 -NNZ_MAX = int(MEMORY_HARD_LIMIT * (1024**3) / (tuplsize + 2*(pntrsize + itemsize))) +def get_nnz_max(): + return int(MEMORY_HARD_LIMIT * (1024**3) / (tuplsize + 2*(pntrsize + itemsize))) def range_division(length, fit_size): diff --git a/polara/tools/printing.py b/polara/tools/display.py similarity index 100% rename from polara/tools/printing.py rename to polara/tools/display.py diff --git a/polara/tools/timing.py b/polara/tools/timing.py index bcc4f8b..7395138 100644 --- a/polara/tools/timing.py +++ b/polara/tools/timing.py @@ -1,7 +1,7 @@ from timeit import default_timer as timer -class Timer(): +class Timer(object): def __init__(self, model_name='Model', verbose=True, msg=None): self.model_name = model_name self.message = msg or '{} training time: {}s' diff --git a/setup.py b/setup.py index a59b6d9..eb47339 100644 --- a/setup.py +++ b/setup.py @@ -15,14 +15,17 @@ opts = dict(name="polara", - description="Fast and flexible recommender framework", + description="Fast and flexible recommender system framework", keywords = "recommender system", - version = "0.5.3", + version = "0.6.0", license="MIT", author="Evgeny Frolov", platforms=["any"], packages=_packages) +extras = dict(install_requires=['futures; python_version=="2.7"']) + +opts.update(extras) if __name__ == '__main__': setup(**opts)