0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import array
0019 import sys
0020 from collections import namedtuple
0021
0022 from pyspark import SparkContext, since
0023 from pyspark.rdd import RDD
0024 from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
0025 from pyspark.mllib.util import JavaLoader, JavaSaveable
0026 from pyspark.sql import DataFrame
0027
0028 __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating']
0029
0030
0031 class Rating(namedtuple("Rating", ["user", "product", "rating"])):
0032 """
0033 Represents a (user, product, rating) tuple.
0034
0035 >>> r = Rating(1, 2, 5.0)
0036 >>> (r.user, r.product, r.rating)
0037 (1, 2, 5.0)
0038 >>> (r[0], r[1], r[2])
0039 (1, 2, 5.0)
0040
0041 .. versionadded:: 1.2.0
0042 """
0043
0044 def __reduce__(self):
0045 return Rating, (int(self.user), int(self.product), float(self.rating))
0046
0047
0048 @inherit_doc
0049 class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
0050
0051 """A matrix factorisation model trained by regularized alternating
0052 least-squares.
0053
0054 >>> r1 = (1, 1, 1.0)
0055 >>> r2 = (1, 2, 2.0)
0056 >>> r3 = (2, 1, 2.0)
0057 >>> ratings = sc.parallelize([r1, r2, r3])
0058 >>> model = ALS.trainImplicit(ratings, 1, seed=10)
0059 >>> model.predict(2, 2)
0060 0.4...
0061
0062 >>> testset = sc.parallelize([(1, 2), (1, 1)])
0063 >>> model = ALS.train(ratings, 2, seed=0)
0064 >>> model.predictAll(testset).collect()
0065 [Rating(user=1, product=1, rating=1.0...), Rating(user=1, product=2, rating=1.9...)]
0066
0067 >>> model = ALS.train(ratings, 4, seed=10)
0068 >>> model.userFeatures().collect()
0069 [(1, array('d', [...])), (2, array('d', [...]))]
0070
0071 >>> model.recommendUsers(1, 2)
0072 [Rating(user=2, product=1, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
0073 >>> model.recommendProducts(1, 2)
0074 [Rating(user=1, product=2, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
0075 >>> model.rank
0076 4
0077
0078 >>> first_user = model.userFeatures().take(1)[0]
0079 >>> latents = first_user[1]
0080 >>> len(latents)
0081 4
0082
0083 >>> model.productFeatures().collect()
0084 [(1, array('d', [...])), (2, array('d', [...]))]
0085
0086 >>> first_product = model.productFeatures().take(1)[0]
0087 >>> latents = first_product[1]
0088 >>> len(latents)
0089 4
0090
0091 >>> products_for_users = model.recommendProductsForUsers(1).collect()
0092 >>> len(products_for_users)
0093 2
0094 >>> products_for_users[0]
0095 (1, (Rating(user=1, product=2, rating=...),))
0096
0097 >>> users_for_products = model.recommendUsersForProducts(1).collect()
0098 >>> len(users_for_products)
0099 2
0100 >>> users_for_products[0]
0101 (1, (Rating(user=2, product=1, rating=...),))
0102
0103 >>> model = ALS.train(ratings, 1, nonnegative=True, seed=123456789)
0104 >>> model.predict(2, 2)
0105 3.73...
0106
0107 >>> df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)])
0108 >>> model = ALS.train(df, 1, nonnegative=True, seed=123456789)
0109 >>> model.predict(2, 2)
0110 3.73...
0111
0112 >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=123456789)
0113 >>> model.predict(2, 2)
0114 0.4...
0115
0116 >>> import os, tempfile
0117 >>> path = tempfile.mkdtemp()
0118 >>> model.save(sc, path)
0119 >>> sameModel = MatrixFactorizationModel.load(sc, path)
0120 >>> sameModel.predict(2, 2)
0121 0.4...
0122 >>> sameModel.predictAll(testset).collect()
0123 [Rating(...
0124 >>> from shutil import rmtree
0125 >>> try:
0126 ... rmtree(path)
0127 ... except OSError:
0128 ... pass
0129
0130 .. versionadded:: 0.9.0
0131 """
0132 @since("0.9.0")
0133 def predict(self, user, product):
0134 """
0135 Predicts rating for the given user and product.
0136 """
0137 return self._java_model.predict(int(user), int(product))
0138
0139 @since("0.9.0")
0140 def predictAll(self, user_product):
0141 """
0142 Returns a list of predicted ratings for input user and product
0143 pairs.
0144 """
0145 assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
0146 first = user_product.first()
0147 assert len(first) == 2, "user_product should be RDD of (user, product)"
0148 user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
0149 return self.call("predict", user_product)
0150
0151 @since("1.2.0")
0152 def userFeatures(self):
0153 """
0154 Returns a paired RDD, where the first element is the user and the
0155 second is an array of features corresponding to that user.
0156 """
0157 return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))
0158
0159 @since("1.2.0")
0160 def productFeatures(self):
0161 """
0162 Returns a paired RDD, where the first element is the product and the
0163 second is an array of features corresponding to that product.
0164 """
0165 return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))
0166
0167 @since("1.4.0")
0168 def recommendUsers(self, product, num):
0169 """
0170 Recommends the top "num" number of users for a given product and
0171 returns a list of Rating objects sorted by the predicted rating in
0172 descending order.
0173 """
0174 return list(self.call("recommendUsers", product, num))
0175
0176 @since("1.4.0")
0177 def recommendProducts(self, user, num):
0178 """
0179 Recommends the top "num" number of products for a given user and
0180 returns a list of Rating objects sorted by the predicted rating in
0181 descending order.
0182 """
0183 return list(self.call("recommendProducts", user, num))
0184
0185 def recommendProductsForUsers(self, num):
0186 """
0187 Recommends the top "num" number of products for all users. The
0188 number of recommendations returned per user may be less than "num".
0189 """
0190 return self.call("wrappedRecommendProductsForUsers", num)
0191
0192 def recommendUsersForProducts(self, num):
0193 """
0194 Recommends the top "num" number of users for all products. The
0195 number of recommendations returned per product may be less than
0196 "num".
0197 """
0198 return self.call("wrappedRecommendUsersForProducts", num)
0199
0200 @property
0201 @since("1.4.0")
0202 def rank(self):
0203 """Rank for the features in this model"""
0204 return self.call("rank")
0205
0206 @classmethod
0207 @since("1.3.1")
0208 def load(cls, sc, path):
0209 """Load a model from the given path"""
0210 model = cls._load_java(sc, path)
0211 wrapper = sc._jvm.org.apache.spark.mllib.api.python.MatrixFactorizationModelWrapper(model)
0212 return MatrixFactorizationModel(wrapper)
0213
0214
0215 class ALS(object):
0216 """Alternating Least Squares matrix factorization
0217
0218 .. versionadded:: 0.9.0
0219 """
0220
0221 @classmethod
0222 def _prepare(cls, ratings):
0223 if isinstance(ratings, RDD):
0224 pass
0225 elif isinstance(ratings, DataFrame):
0226 ratings = ratings.rdd
0227 else:
0228 raise TypeError("Ratings should be represented by either an RDD or a DataFrame, "
0229 "but got %s." % type(ratings))
0230 first = ratings.first()
0231 if isinstance(first, Rating):
0232 pass
0233 elif isinstance(first, (tuple, list)):
0234 ratings = ratings.map(lambda x: Rating(*x))
0235 else:
0236 raise TypeError("Expect a Rating or a tuple/list, but got %s." % type(first))
0237 return ratings
0238
0239 @classmethod
0240 @since("0.9.0")
0241 def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False,
0242 seed=None):
0243 """
0244 Train a matrix factorization model given an RDD of ratings by users
0245 for a subset of products. The ratings matrix is approximated as the
0246 product of two lower-rank matrices of a given rank (number of
0247 features). To solve for these features, ALS is run iteratively with
0248 a configurable level of parallelism.
0249
0250 :param ratings:
0251 RDD of `Rating` or (userID, productID, rating) tuple.
0252 :param rank:
0253 Number of features to use (also referred to as the number of latent factors).
0254 :param iterations:
0255 Number of iterations of ALS.
0256 (default: 5)
0257 :param lambda_:
0258 Regularization parameter.
0259 (default: 0.01)
0260 :param blocks:
0261 Number of blocks used to parallelize the computation. A value
0262 of -1 will use an auto-configured number of blocks.
0263 (default: -1)
0264 :param nonnegative:
0265 A value of True will solve least-squares with nonnegativity
0266 constraints.
0267 (default: False)
0268 :param seed:
0269 Random seed for initial matrix factorization model. A value
0270 of None will use system time as the seed.
0271 (default: None)
0272 """
0273 model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations,
0274 lambda_, blocks, nonnegative, seed)
0275 return MatrixFactorizationModel(model)
0276
0277 @classmethod
0278 @since("0.9.0")
0279 def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01,
0280 nonnegative=False, seed=None):
0281 """
0282 Train a matrix factorization model given an RDD of 'implicit
0283 preferences' of users for a subset of products. The ratings matrix
0284 is approximated as the product of two lower-rank matrices of a
0285 given rank (number of features). To solve for these features, ALS
0286 is run iteratively with a configurable level of parallelism.
0287
0288 :param ratings:
0289 RDD of `Rating` or (userID, productID, rating) tuple.
0290 :param rank:
0291 Number of features to use (also referred to as the number of latent factors).
0292 :param iterations:
0293 Number of iterations of ALS.
0294 (default: 5)
0295 :param lambda_:
0296 Regularization parameter.
0297 (default: 0.01)
0298 :param blocks:
0299 Number of blocks used to parallelize the computation. A value
0300 of -1 will use an auto-configured number of blocks.
0301 (default: -1)
0302 :param alpha:
0303 A constant used in computing confidence.
0304 (default: 0.01)
0305 :param nonnegative:
0306 A value of True will solve least-squares with nonnegativity
0307 constraints.
0308 (default: False)
0309 :param seed:
0310 Random seed for initial matrix factorization model. A value
0311 of None will use system time as the seed.
0312 (default: None)
0313 """
0314 model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank,
0315 iterations, lambda_, blocks, alpha, nonnegative, seed)
0316 return MatrixFactorizationModel(model)
0317
0318
0319 def _test():
0320 import doctest
0321 import pyspark.mllib.recommendation
0322 from pyspark.sql import SQLContext
0323 globs = pyspark.mllib.recommendation.__dict__.copy()
0324 sc = SparkContext('local[4]', 'PythonTest')
0325 globs['sc'] = sc
0326 globs['sqlContext'] = SQLContext(sc)
0327 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0328 globs['sc'].stop()
0329 if failure_count:
0330 sys.exit(-1)
0331
0332
0333 if __name__ == "__main__":
0334 _test()