Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import os
0019 import tempfile
0020 from shutil import rmtree
0021 import unittest
0022 
0023 from numpy import array, array_equal
0024 from py4j.protocol import Py4JJavaError
0025 
0026 from pyspark.mllib.fpm import FPGrowth
0027 from pyspark.mllib.recommendation import Rating
0028 from pyspark.mllib.regression import LabeledPoint
0029 from pyspark.serializers import PickleSerializer
0030 from pyspark.testing.mllibutils import MLlibTestCase
0031 
0032 
0033 class ListTests(MLlibTestCase):
0034 
0035     """
0036     Test MLlib algorithms on plain lists, to make sure they're passed through
0037     as NumPy arrays.
0038     """
0039 
0040     def test_bisecting_kmeans(self):
0041         from pyspark.mllib.clustering import BisectingKMeans
0042         data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2)
0043         bskm = BisectingKMeans()
0044         model = bskm.train(self.sc.parallelize(data, 2), k=4)
0045         p = array([0.0, 0.0])
0046         rdd_p = self.sc.parallelize([p])
0047         self.assertEqual(model.predict(p), model.predict(rdd_p).first())
0048         self.assertEqual(model.computeCost(p), model.computeCost(rdd_p))
0049         self.assertEqual(model.k, len(model.clusterCenters))
0050 
0051     def test_kmeans(self):
0052         from pyspark.mllib.clustering import KMeans
0053         data = [
0054             [0, 1.1],
0055             [0, 1.2],
0056             [1.1, 0],
0057             [1.2, 0],
0058         ]
0059         clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||",
0060                                 initializationSteps=7, epsilon=1e-4)
0061         self.assertEqual(clusters.predict(data[0]), clusters.predict(data[1]))
0062         self.assertEqual(clusters.predict(data[2]), clusters.predict(data[3]))
0063 
0064     def test_kmeans_deterministic(self):
0065         from pyspark.mllib.clustering import KMeans
0066         X = range(0, 100, 10)
0067         Y = range(0, 100, 10)
0068         data = [[x, y] for x, y in zip(X, Y)]
0069         clusters1 = KMeans.train(self.sc.parallelize(data),
0070                                  3, initializationMode="k-means||",
0071                                  seed=42, initializationSteps=7, epsilon=1e-4)
0072         clusters2 = KMeans.train(self.sc.parallelize(data),
0073                                  3, initializationMode="k-means||",
0074                                  seed=42, initializationSteps=7, epsilon=1e-4)
0075         centers1 = clusters1.centers
0076         centers2 = clusters2.centers
0077         for c1, c2 in zip(centers1, centers2):
0078             # TODO: Allow small numeric difference.
0079             self.assertTrue(array_equal(c1, c2))
0080 
0081     def test_gmm(self):
0082         from pyspark.mllib.clustering import GaussianMixture
0083         data = self.sc.parallelize([
0084             [1, 2],
0085             [8, 9],
0086             [-4, -3],
0087             [-6, -7],
0088         ])
0089         clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
0090                                          maxIterations=10, seed=1)
0091         labels = clusters.predict(data).collect()
0092         self.assertEqual(labels[0], labels[1])
0093         self.assertEqual(labels[2], labels[3])
0094 
0095     def test_gmm_deterministic(self):
0096         from pyspark.mllib.clustering import GaussianMixture
0097         x = range(0, 100, 10)
0098         y = range(0, 100, 10)
0099         data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
0100         clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
0101                                           maxIterations=10, seed=63)
0102         clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
0103                                           maxIterations=10, seed=63)
0104         for c1, c2 in zip(clusters1.weights, clusters2.weights):
0105             self.assertEqual(round(c1, 7), round(c2, 7))
0106 
0107     def test_gmm_with_initial_model(self):
0108         from pyspark.mllib.clustering import GaussianMixture
0109         data = self.sc.parallelize([
0110             (-10, -5), (-9, -4), (10, 5), (9, 4)
0111         ])
0112 
0113         gmm1 = GaussianMixture.train(data, 2, convergenceTol=0.001,
0114                                      maxIterations=10, seed=63)
0115         gmm2 = GaussianMixture.train(data, 2, convergenceTol=0.001,
0116                                      maxIterations=10, seed=63, initialModel=gmm1)
0117         self.assertAlmostEqual((gmm1.weights - gmm2.weights).sum(), 0.0)
0118 
0119     def test_classification(self):
0120         from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
0121         from pyspark.mllib.tree import DecisionTree, DecisionTreeModel, RandomForest, \
0122             RandomForestModel, GradientBoostedTrees, GradientBoostedTreesModel
0123         data = [
0124             LabeledPoint(0.0, [1, 0, 0]),
0125             LabeledPoint(1.0, [0, 1, 1]),
0126             LabeledPoint(0.0, [2, 0, 0]),
0127             LabeledPoint(1.0, [0, 2, 1])
0128         ]
0129         rdd = self.sc.parallelize(data)
0130         features = [p.features.tolist() for p in data]
0131 
0132         temp_dir = tempfile.mkdtemp()
0133 
0134         lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10)
0135         self.assertTrue(lr_model.predict(features[0]) <= 0)
0136         self.assertTrue(lr_model.predict(features[1]) > 0)
0137         self.assertTrue(lr_model.predict(features[2]) <= 0)
0138         self.assertTrue(lr_model.predict(features[3]) > 0)
0139 
0140         svm_model = SVMWithSGD.train(rdd, iterations=10)
0141         self.assertTrue(svm_model.predict(features[0]) <= 0)
0142         self.assertTrue(svm_model.predict(features[1]) > 0)
0143         self.assertTrue(svm_model.predict(features[2]) <= 0)
0144         self.assertTrue(svm_model.predict(features[3]) > 0)
0145 
0146         nb_model = NaiveBayes.train(rdd)
0147         self.assertTrue(nb_model.predict(features[0]) <= 0)
0148         self.assertTrue(nb_model.predict(features[1]) > 0)
0149         self.assertTrue(nb_model.predict(features[2]) <= 0)
0150         self.assertTrue(nb_model.predict(features[3]) > 0)
0151 
0152         categoricalFeaturesInfo = {0: 3}  # feature 0 has 3 categories
0153         dt_model = DecisionTree.trainClassifier(
0154             rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
0155         self.assertTrue(dt_model.predict(features[0]) <= 0)
0156         self.assertTrue(dt_model.predict(features[1]) > 0)
0157         self.assertTrue(dt_model.predict(features[2]) <= 0)
0158         self.assertTrue(dt_model.predict(features[3]) > 0)
0159 
0160         dt_model_dir = os.path.join(temp_dir, "dt")
0161         dt_model.save(self.sc, dt_model_dir)
0162         same_dt_model = DecisionTreeModel.load(self.sc, dt_model_dir)
0163         self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString())
0164 
0165         rf_model = RandomForest.trainClassifier(
0166             rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10,
0167             maxBins=4, seed=1)
0168         self.assertTrue(rf_model.predict(features[0]) <= 0)
0169         self.assertTrue(rf_model.predict(features[1]) > 0)
0170         self.assertTrue(rf_model.predict(features[2]) <= 0)
0171         self.assertTrue(rf_model.predict(features[3]) > 0)
0172 
0173         rf_model_dir = os.path.join(temp_dir, "rf")
0174         rf_model.save(self.sc, rf_model_dir)
0175         same_rf_model = RandomForestModel.load(self.sc, rf_model_dir)
0176         self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString())
0177 
0178         gbt_model = GradientBoostedTrees.trainClassifier(
0179             rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
0180         self.assertTrue(gbt_model.predict(features[0]) <= 0)
0181         self.assertTrue(gbt_model.predict(features[1]) > 0)
0182         self.assertTrue(gbt_model.predict(features[2]) <= 0)
0183         self.assertTrue(gbt_model.predict(features[3]) > 0)
0184 
0185         gbt_model_dir = os.path.join(temp_dir, "gbt")
0186         gbt_model.save(self.sc, gbt_model_dir)
0187         same_gbt_model = GradientBoostedTreesModel.load(self.sc, gbt_model_dir)
0188         self.assertEqual(same_gbt_model.toDebugString(), gbt_model.toDebugString())
0189 
0190         try:
0191             rmtree(temp_dir)
0192         except OSError:
0193             pass
0194 
0195     def test_regression(self):
0196         from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
0197             RidgeRegressionWithSGD
0198         from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
0199         data = [
0200             LabeledPoint(-1.0, [0, -1]),
0201             LabeledPoint(1.0, [0, 1]),
0202             LabeledPoint(-1.0, [0, -2]),
0203             LabeledPoint(1.0, [0, 2])
0204         ]
0205         rdd = self.sc.parallelize(data)
0206         features = [p.features.tolist() for p in data]
0207 
0208         lr_model = LinearRegressionWithSGD.train(rdd, iterations=10)
0209         self.assertTrue(lr_model.predict(features[0]) <= 0)
0210         self.assertTrue(lr_model.predict(features[1]) > 0)
0211         self.assertTrue(lr_model.predict(features[2]) <= 0)
0212         self.assertTrue(lr_model.predict(features[3]) > 0)
0213 
0214         lasso_model = LassoWithSGD.train(rdd, iterations=10)
0215         self.assertTrue(lasso_model.predict(features[0]) <= 0)
0216         self.assertTrue(lasso_model.predict(features[1]) > 0)
0217         self.assertTrue(lasso_model.predict(features[2]) <= 0)
0218         self.assertTrue(lasso_model.predict(features[3]) > 0)
0219 
0220         rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10)
0221         self.assertTrue(rr_model.predict(features[0]) <= 0)
0222         self.assertTrue(rr_model.predict(features[1]) > 0)
0223         self.assertTrue(rr_model.predict(features[2]) <= 0)
0224         self.assertTrue(rr_model.predict(features[3]) > 0)
0225 
0226         categoricalFeaturesInfo = {0: 2}  # feature 0 has 2 categories
0227         dt_model = DecisionTree.trainRegressor(
0228             rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
0229         self.assertTrue(dt_model.predict(features[0]) <= 0)
0230         self.assertTrue(dt_model.predict(features[1]) > 0)
0231         self.assertTrue(dt_model.predict(features[2]) <= 0)
0232         self.assertTrue(dt_model.predict(features[3]) > 0)
0233 
0234         rf_model = RandomForest.trainRegressor(
0235             rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1)
0236         self.assertTrue(rf_model.predict(features[0]) <= 0)
0237         self.assertTrue(rf_model.predict(features[1]) > 0)
0238         self.assertTrue(rf_model.predict(features[2]) <= 0)
0239         self.assertTrue(rf_model.predict(features[3]) > 0)
0240 
0241         gbt_model = GradientBoostedTrees.trainRegressor(
0242             rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
0243         self.assertTrue(gbt_model.predict(features[0]) <= 0)
0244         self.assertTrue(gbt_model.predict(features[1]) > 0)
0245         self.assertTrue(gbt_model.predict(features[2]) <= 0)
0246         self.assertTrue(gbt_model.predict(features[3]) > 0)
0247 
0248         try:
0249             LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
0250             LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
0251             RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
0252         except ValueError:
0253             self.fail()
0254 
0255         # Verify that maxBins is being passed through
0256         GradientBoostedTrees.trainRegressor(
0257             rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=32)
0258         with self.assertRaises(Exception) as cm:
0259             GradientBoostedTrees.trainRegressor(
0260                 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=1)
0261 
0262 
0263 class ALSTests(MLlibTestCase):
0264 
0265     def test_als_ratings_serialize(self):
0266         ser = PickleSerializer()
0267         r = Rating(7, 1123, 3.14)
0268         jr = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(r)))
0269         nr = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jr)))
0270         self.assertEqual(r.user, nr.user)
0271         self.assertEqual(r.product, nr.product)
0272         self.assertAlmostEqual(r.rating, nr.rating, 2)
0273 
0274     def test_als_ratings_id_long_error(self):
0275         ser = PickleSerializer()
0276         r = Rating(1205640308657491975, 50233468418, 1.0)
0277         # rating user id exceeds max int value, should fail when pickled
0278         self.assertRaises(Py4JJavaError, self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads,
0279                           bytearray(ser.dumps(r)))
0280 
0281 
0282 class FPGrowthTest(MLlibTestCase):
0283 
0284     def test_fpgrowth(self):
0285         data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
0286         rdd = self.sc.parallelize(data, 2)
0287         model1 = FPGrowth.train(rdd, 0.6, 2)
0288         # use default data partition number when numPartitions is not specified
0289         model2 = FPGrowth.train(rdd, 0.6)
0290         self.assertEqual(sorted(model1.freqItemsets().collect()),
0291                          sorted(model2.freqItemsets().collect()))
0292 
0293 
0294 if __name__ == "__main__":
0295     from pyspark.mllib.tests.test_algorithms import *
0296 
0297     try:
0298         import xmlrunner
0299         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0300     except ImportError:
0301         testRunner = None
0302     unittest.main(testRunner=testRunner, verbosity=2)