Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import array
0019 import sys
0020 from collections import namedtuple
0021 
0022 from pyspark import SparkContext, since
0023 from pyspark.rdd import RDD
0024 from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
0025 from pyspark.mllib.util import JavaLoader, JavaSaveable
0026 from pyspark.sql import DataFrame
0027 
0028 __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating']
0029 
0030 
0031 class Rating(namedtuple("Rating", ["user", "product", "rating"])):
0032     """
0033     Represents a (user, product, rating) tuple.
0034 
0035     >>> r = Rating(1, 2, 5.0)
0036     >>> (r.user, r.product, r.rating)
0037     (1, 2, 5.0)
0038     >>> (r[0], r[1], r[2])
0039     (1, 2, 5.0)
0040 
0041     .. versionadded:: 1.2.0
0042     """
0043 
0044     def __reduce__(self):
0045         return Rating, (int(self.user), int(self.product), float(self.rating))
0046 
0047 
0048 @inherit_doc
0049 class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
0050 
0051     """A matrix factorisation model trained by regularized alternating
0052     least-squares.
0053 
0054     >>> r1 = (1, 1, 1.0)
0055     >>> r2 = (1, 2, 2.0)
0056     >>> r3 = (2, 1, 2.0)
0057     >>> ratings = sc.parallelize([r1, r2, r3])
0058     >>> model = ALS.trainImplicit(ratings, 1, seed=10)
0059     >>> model.predict(2, 2)
0060     0.4...
0061 
0062     >>> testset = sc.parallelize([(1, 2), (1, 1)])
0063     >>> model = ALS.train(ratings, 2, seed=0)
0064     >>> model.predictAll(testset).collect()
0065     [Rating(user=1, product=1, rating=1.0...), Rating(user=1, product=2, rating=1.9...)]
0066 
0067     >>> model = ALS.train(ratings, 4, seed=10)
0068     >>> model.userFeatures().collect()
0069     [(1, array('d', [...])), (2, array('d', [...]))]
0070 
0071     >>> model.recommendUsers(1, 2)
0072     [Rating(user=2, product=1, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
0073     >>> model.recommendProducts(1, 2)
0074     [Rating(user=1, product=2, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
0075     >>> model.rank
0076     4
0077 
0078     >>> first_user = model.userFeatures().take(1)[0]
0079     >>> latents = first_user[1]
0080     >>> len(latents)
0081     4
0082 
0083     >>> model.productFeatures().collect()
0084     [(1, array('d', [...])), (2, array('d', [...]))]
0085 
0086     >>> first_product = model.productFeatures().take(1)[0]
0087     >>> latents = first_product[1]
0088     >>> len(latents)
0089     4
0090 
0091     >>> products_for_users = model.recommendProductsForUsers(1).collect()
0092     >>> len(products_for_users)
0093     2
0094     >>> products_for_users[0]
0095     (1, (Rating(user=1, product=2, rating=...),))
0096 
0097     >>> users_for_products = model.recommendUsersForProducts(1).collect()
0098     >>> len(users_for_products)
0099     2
0100     >>> users_for_products[0]
0101     (1, (Rating(user=2, product=1, rating=...),))
0102 
0103     >>> model = ALS.train(ratings, 1, nonnegative=True, seed=123456789)
0104     >>> model.predict(2, 2)
0105     3.73...
0106 
0107     >>> df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)])
0108     >>> model = ALS.train(df, 1, nonnegative=True, seed=123456789)
0109     >>> model.predict(2, 2)
0110     3.73...
0111 
0112     >>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=123456789)
0113     >>> model.predict(2, 2)
0114     0.4...
0115 
0116     >>> import os, tempfile
0117     >>> path = tempfile.mkdtemp()
0118     >>> model.save(sc, path)
0119     >>> sameModel = MatrixFactorizationModel.load(sc, path)
0120     >>> sameModel.predict(2, 2)
0121     0.4...
0122     >>> sameModel.predictAll(testset).collect()
0123     [Rating(...
0124     >>> from shutil import rmtree
0125     >>> try:
0126     ...     rmtree(path)
0127     ... except OSError:
0128     ...     pass
0129 
0130     .. versionadded:: 0.9.0
0131     """
0132     @since("0.9.0")
0133     def predict(self, user, product):
0134         """
0135         Predicts rating for the given user and product.
0136         """
0137         return self._java_model.predict(int(user), int(product))
0138 
0139     @since("0.9.0")
0140     def predictAll(self, user_product):
0141         """
0142         Returns a list of predicted ratings for input user and product
0143         pairs.
0144         """
0145         assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
0146         first = user_product.first()
0147         assert len(first) == 2, "user_product should be RDD of (user, product)"
0148         user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
0149         return self.call("predict", user_product)
0150 
0151     @since("1.2.0")
0152     def userFeatures(self):
0153         """
0154         Returns a paired RDD, where the first element is the user and the
0155         second is an array of features corresponding to that user.
0156         """
0157         return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))
0158 
0159     @since("1.2.0")
0160     def productFeatures(self):
0161         """
0162         Returns a paired RDD, where the first element is the product and the
0163         second is an array of features corresponding to that product.
0164         """
0165         return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))
0166 
0167     @since("1.4.0")
0168     def recommendUsers(self, product, num):
0169         """
0170         Recommends the top "num" number of users for a given product and
0171         returns a list of Rating objects sorted by the predicted rating in
0172         descending order.
0173         """
0174         return list(self.call("recommendUsers", product, num))
0175 
0176     @since("1.4.0")
0177     def recommendProducts(self, user, num):
0178         """
0179         Recommends the top "num" number of products for a given user and
0180         returns a list of Rating objects sorted by the predicted rating in
0181         descending order.
0182         """
0183         return list(self.call("recommendProducts", user, num))
0184 
0185     def recommendProductsForUsers(self, num):
0186         """
0187         Recommends the top "num" number of products for all users. The
0188         number of recommendations returned per user may be less than "num".
0189         """
0190         return self.call("wrappedRecommendProductsForUsers", num)
0191 
0192     def recommendUsersForProducts(self, num):
0193         """
0194         Recommends the top "num" number of users for all products. The
0195         number of recommendations returned per product may be less than
0196         "num".
0197         """
0198         return self.call("wrappedRecommendUsersForProducts", num)
0199 
0200     @property
0201     @since("1.4.0")
0202     def rank(self):
0203         """Rank for the features in this model"""
0204         return self.call("rank")
0205 
0206     @classmethod
0207     @since("1.3.1")
0208     def load(cls, sc, path):
0209         """Load a model from the given path"""
0210         model = cls._load_java(sc, path)
0211         wrapper = sc._jvm.org.apache.spark.mllib.api.python.MatrixFactorizationModelWrapper(model)
0212         return MatrixFactorizationModel(wrapper)
0213 
0214 
0215 class ALS(object):
0216     """Alternating Least Squares matrix factorization
0217 
0218     .. versionadded:: 0.9.0
0219     """
0220 
0221     @classmethod
0222     def _prepare(cls, ratings):
0223         if isinstance(ratings, RDD):
0224             pass
0225         elif isinstance(ratings, DataFrame):
0226             ratings = ratings.rdd
0227         else:
0228             raise TypeError("Ratings should be represented by either an RDD or a DataFrame, "
0229                             "but got %s." % type(ratings))
0230         first = ratings.first()
0231         if isinstance(first, Rating):
0232             pass
0233         elif isinstance(first, (tuple, list)):
0234             ratings = ratings.map(lambda x: Rating(*x))
0235         else:
0236             raise TypeError("Expect a Rating or a tuple/list, but got %s." % type(first))
0237         return ratings
0238 
0239     @classmethod
0240     @since("0.9.0")
0241     def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, nonnegative=False,
0242               seed=None):
0243         """
0244         Train a matrix factorization model given an RDD of ratings by users
0245         for a subset of products. The ratings matrix is approximated as the
0246         product of two lower-rank matrices of a given rank (number of
0247         features). To solve for these features, ALS is run iteratively with
0248         a configurable level of parallelism.
0249 
0250         :param ratings:
0251           RDD of `Rating` or (userID, productID, rating) tuple.
0252         :param rank:
0253           Number of features to use (also referred to as the number of latent factors).
0254         :param iterations:
0255           Number of iterations of ALS.
0256           (default: 5)
0257         :param lambda_:
0258           Regularization parameter.
0259           (default: 0.01)
0260         :param blocks:
0261           Number of blocks used to parallelize the computation. A value
0262           of -1 will use an auto-configured number of blocks.
0263           (default: -1)
0264         :param nonnegative:
0265           A value of True will solve least-squares with nonnegativity
0266           constraints.
0267           (default: False)
0268         :param seed:
0269           Random seed for initial matrix factorization model. A value
0270           of None will use system time as the seed.
0271           (default: None)
0272         """
0273         model = callMLlibFunc("trainALSModel", cls._prepare(ratings), rank, iterations,
0274                               lambda_, blocks, nonnegative, seed)
0275         return MatrixFactorizationModel(model)
0276 
0277     @classmethod
0278     @since("0.9.0")
0279     def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01,
0280                       nonnegative=False, seed=None):
0281         """
0282         Train a matrix factorization model given an RDD of 'implicit
0283         preferences' of users for a subset of products. The ratings matrix
0284         is approximated as the product of two lower-rank matrices of a
0285         given rank (number of features). To solve for these features, ALS
0286         is run iteratively with a configurable level of parallelism.
0287 
0288         :param ratings:
0289           RDD of `Rating` or (userID, productID, rating) tuple.
0290         :param rank:
0291           Number of features to use (also referred to as the number of latent factors).
0292         :param iterations:
0293           Number of iterations of ALS.
0294           (default: 5)
0295         :param lambda_:
0296           Regularization parameter.
0297           (default: 0.01)
0298         :param blocks:
0299           Number of blocks used to parallelize the computation. A value
0300           of -1 will use an auto-configured number of blocks.
0301           (default: -1)
0302         :param alpha:
0303           A constant used in computing confidence.
0304           (default: 0.01)
0305         :param nonnegative:
0306           A value of True will solve least-squares with nonnegativity
0307           constraints.
0308           (default: False)
0309         :param seed:
0310           Random seed for initial matrix factorization model. A value
0311           of None will use system time as the seed.
0312           (default: None)
0313         """
0314         model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank,
0315                               iterations, lambda_, blocks, alpha, nonnegative, seed)
0316         return MatrixFactorizationModel(model)
0317 
0318 
0319 def _test():
0320     import doctest
0321     import pyspark.mllib.recommendation
0322     from pyspark.sql import SQLContext
0323     globs = pyspark.mllib.recommendation.__dict__.copy()
0324     sc = SparkContext('local[4]', 'PythonTest')
0325     globs['sc'] = sc
0326     globs['sqlContext'] = SQLContext(sc)
0327     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0328     globs['sc'].stop()
0329     if failure_count:
0330         sys.exit(-1)
0331 
0332 
0333 if __name__ == "__main__":
0334     _test()