0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import os
0019 import tempfile
0020 from shutil import rmtree
0021 import unittest
0022
0023 from numpy import array, array_equal
0024 from py4j.protocol import Py4JJavaError
0025
0026 from pyspark.mllib.fpm import FPGrowth
0027 from pyspark.mllib.recommendation import Rating
0028 from pyspark.mllib.regression import LabeledPoint
0029 from pyspark.serializers import PickleSerializer
0030 from pyspark.testing.mllibutils import MLlibTestCase
0031
0032
0033 class ListTests(MLlibTestCase):
0034
0035 """
0036 Test MLlib algorithms on plain lists, to make sure they're passed through
0037 as NumPy arrays.
0038 """
0039
0040 def test_bisecting_kmeans(self):
0041 from pyspark.mllib.clustering import BisectingKMeans
0042 data = array([0.0, 0.0, 1.0, 1.0, 9.0, 8.0, 8.0, 9.0]).reshape(4, 2)
0043 bskm = BisectingKMeans()
0044 model = bskm.train(self.sc.parallelize(data, 2), k=4)
0045 p = array([0.0, 0.0])
0046 rdd_p = self.sc.parallelize([p])
0047 self.assertEqual(model.predict(p), model.predict(rdd_p).first())
0048 self.assertEqual(model.computeCost(p), model.computeCost(rdd_p))
0049 self.assertEqual(model.k, len(model.clusterCenters))
0050
0051 def test_kmeans(self):
0052 from pyspark.mllib.clustering import KMeans
0053 data = [
0054 [0, 1.1],
0055 [0, 1.2],
0056 [1.1, 0],
0057 [1.2, 0],
0058 ]
0059 clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||",
0060 initializationSteps=7, epsilon=1e-4)
0061 self.assertEqual(clusters.predict(data[0]), clusters.predict(data[1]))
0062 self.assertEqual(clusters.predict(data[2]), clusters.predict(data[3]))
0063
0064 def test_kmeans_deterministic(self):
0065 from pyspark.mllib.clustering import KMeans
0066 X = range(0, 100, 10)
0067 Y = range(0, 100, 10)
0068 data = [[x, y] for x, y in zip(X, Y)]
0069 clusters1 = KMeans.train(self.sc.parallelize(data),
0070 3, initializationMode="k-means||",
0071 seed=42, initializationSteps=7, epsilon=1e-4)
0072 clusters2 = KMeans.train(self.sc.parallelize(data),
0073 3, initializationMode="k-means||",
0074 seed=42, initializationSteps=7, epsilon=1e-4)
0075 centers1 = clusters1.centers
0076 centers2 = clusters2.centers
0077 for c1, c2 in zip(centers1, centers2):
0078
0079 self.assertTrue(array_equal(c1, c2))
0080
0081 def test_gmm(self):
0082 from pyspark.mllib.clustering import GaussianMixture
0083 data = self.sc.parallelize([
0084 [1, 2],
0085 [8, 9],
0086 [-4, -3],
0087 [-6, -7],
0088 ])
0089 clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
0090 maxIterations=10, seed=1)
0091 labels = clusters.predict(data).collect()
0092 self.assertEqual(labels[0], labels[1])
0093 self.assertEqual(labels[2], labels[3])
0094
0095 def test_gmm_deterministic(self):
0096 from pyspark.mllib.clustering import GaussianMixture
0097 x = range(0, 100, 10)
0098 y = range(0, 100, 10)
0099 data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
0100 clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
0101 maxIterations=10, seed=63)
0102 clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
0103 maxIterations=10, seed=63)
0104 for c1, c2 in zip(clusters1.weights, clusters2.weights):
0105 self.assertEqual(round(c1, 7), round(c2, 7))
0106
0107 def test_gmm_with_initial_model(self):
0108 from pyspark.mllib.clustering import GaussianMixture
0109 data = self.sc.parallelize([
0110 (-10, -5), (-9, -4), (10, 5), (9, 4)
0111 ])
0112
0113 gmm1 = GaussianMixture.train(data, 2, convergenceTol=0.001,
0114 maxIterations=10, seed=63)
0115 gmm2 = GaussianMixture.train(data, 2, convergenceTol=0.001,
0116 maxIterations=10, seed=63, initialModel=gmm1)
0117 self.assertAlmostEqual((gmm1.weights - gmm2.weights).sum(), 0.0)
0118
0119 def test_classification(self):
0120 from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
0121 from pyspark.mllib.tree import DecisionTree, DecisionTreeModel, RandomForest, \
0122 RandomForestModel, GradientBoostedTrees, GradientBoostedTreesModel
0123 data = [
0124 LabeledPoint(0.0, [1, 0, 0]),
0125 LabeledPoint(1.0, [0, 1, 1]),
0126 LabeledPoint(0.0, [2, 0, 0]),
0127 LabeledPoint(1.0, [0, 2, 1])
0128 ]
0129 rdd = self.sc.parallelize(data)
0130 features = [p.features.tolist() for p in data]
0131
0132 temp_dir = tempfile.mkdtemp()
0133
0134 lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10)
0135 self.assertTrue(lr_model.predict(features[0]) <= 0)
0136 self.assertTrue(lr_model.predict(features[1]) > 0)
0137 self.assertTrue(lr_model.predict(features[2]) <= 0)
0138 self.assertTrue(lr_model.predict(features[3]) > 0)
0139
0140 svm_model = SVMWithSGD.train(rdd, iterations=10)
0141 self.assertTrue(svm_model.predict(features[0]) <= 0)
0142 self.assertTrue(svm_model.predict(features[1]) > 0)
0143 self.assertTrue(svm_model.predict(features[2]) <= 0)
0144 self.assertTrue(svm_model.predict(features[3]) > 0)
0145
0146 nb_model = NaiveBayes.train(rdd)
0147 self.assertTrue(nb_model.predict(features[0]) <= 0)
0148 self.assertTrue(nb_model.predict(features[1]) > 0)
0149 self.assertTrue(nb_model.predict(features[2]) <= 0)
0150 self.assertTrue(nb_model.predict(features[3]) > 0)
0151
0152 categoricalFeaturesInfo = {0: 3}
0153 dt_model = DecisionTree.trainClassifier(
0154 rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
0155 self.assertTrue(dt_model.predict(features[0]) <= 0)
0156 self.assertTrue(dt_model.predict(features[1]) > 0)
0157 self.assertTrue(dt_model.predict(features[2]) <= 0)
0158 self.assertTrue(dt_model.predict(features[3]) > 0)
0159
0160 dt_model_dir = os.path.join(temp_dir, "dt")
0161 dt_model.save(self.sc, dt_model_dir)
0162 same_dt_model = DecisionTreeModel.load(self.sc, dt_model_dir)
0163 self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString())
0164
0165 rf_model = RandomForest.trainClassifier(
0166 rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10,
0167 maxBins=4, seed=1)
0168 self.assertTrue(rf_model.predict(features[0]) <= 0)
0169 self.assertTrue(rf_model.predict(features[1]) > 0)
0170 self.assertTrue(rf_model.predict(features[2]) <= 0)
0171 self.assertTrue(rf_model.predict(features[3]) > 0)
0172
0173 rf_model_dir = os.path.join(temp_dir, "rf")
0174 rf_model.save(self.sc, rf_model_dir)
0175 same_rf_model = RandomForestModel.load(self.sc, rf_model_dir)
0176 self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString())
0177
0178 gbt_model = GradientBoostedTrees.trainClassifier(
0179 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
0180 self.assertTrue(gbt_model.predict(features[0]) <= 0)
0181 self.assertTrue(gbt_model.predict(features[1]) > 0)
0182 self.assertTrue(gbt_model.predict(features[2]) <= 0)
0183 self.assertTrue(gbt_model.predict(features[3]) > 0)
0184
0185 gbt_model_dir = os.path.join(temp_dir, "gbt")
0186 gbt_model.save(self.sc, gbt_model_dir)
0187 same_gbt_model = GradientBoostedTreesModel.load(self.sc, gbt_model_dir)
0188 self.assertEqual(same_gbt_model.toDebugString(), gbt_model.toDebugString())
0189
0190 try:
0191 rmtree(temp_dir)
0192 except OSError:
0193 pass
0194
0195 def test_regression(self):
0196 from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
0197 RidgeRegressionWithSGD
0198 from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
0199 data = [
0200 LabeledPoint(-1.0, [0, -1]),
0201 LabeledPoint(1.0, [0, 1]),
0202 LabeledPoint(-1.0, [0, -2]),
0203 LabeledPoint(1.0, [0, 2])
0204 ]
0205 rdd = self.sc.parallelize(data)
0206 features = [p.features.tolist() for p in data]
0207
0208 lr_model = LinearRegressionWithSGD.train(rdd, iterations=10)
0209 self.assertTrue(lr_model.predict(features[0]) <= 0)
0210 self.assertTrue(lr_model.predict(features[1]) > 0)
0211 self.assertTrue(lr_model.predict(features[2]) <= 0)
0212 self.assertTrue(lr_model.predict(features[3]) > 0)
0213
0214 lasso_model = LassoWithSGD.train(rdd, iterations=10)
0215 self.assertTrue(lasso_model.predict(features[0]) <= 0)
0216 self.assertTrue(lasso_model.predict(features[1]) > 0)
0217 self.assertTrue(lasso_model.predict(features[2]) <= 0)
0218 self.assertTrue(lasso_model.predict(features[3]) > 0)
0219
0220 rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10)
0221 self.assertTrue(rr_model.predict(features[0]) <= 0)
0222 self.assertTrue(rr_model.predict(features[1]) > 0)
0223 self.assertTrue(rr_model.predict(features[2]) <= 0)
0224 self.assertTrue(rr_model.predict(features[3]) > 0)
0225
0226 categoricalFeaturesInfo = {0: 2}
0227 dt_model = DecisionTree.trainRegressor(
0228 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
0229 self.assertTrue(dt_model.predict(features[0]) <= 0)
0230 self.assertTrue(dt_model.predict(features[1]) > 0)
0231 self.assertTrue(dt_model.predict(features[2]) <= 0)
0232 self.assertTrue(dt_model.predict(features[3]) > 0)
0233
0234 rf_model = RandomForest.trainRegressor(
0235 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1)
0236 self.assertTrue(rf_model.predict(features[0]) <= 0)
0237 self.assertTrue(rf_model.predict(features[1]) > 0)
0238 self.assertTrue(rf_model.predict(features[2]) <= 0)
0239 self.assertTrue(rf_model.predict(features[3]) > 0)
0240
0241 gbt_model = GradientBoostedTrees.trainRegressor(
0242 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
0243 self.assertTrue(gbt_model.predict(features[0]) <= 0)
0244 self.assertTrue(gbt_model.predict(features[1]) > 0)
0245 self.assertTrue(gbt_model.predict(features[2]) <= 0)
0246 self.assertTrue(gbt_model.predict(features[3]) > 0)
0247
0248 try:
0249 LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
0250 LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
0251 RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
0252 except ValueError:
0253 self.fail()
0254
0255
0256 GradientBoostedTrees.trainRegressor(
0257 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=32)
0258 with self.assertRaises(Exception) as cm:
0259 GradientBoostedTrees.trainRegressor(
0260 rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4, maxBins=1)
0261
0262
0263 class ALSTests(MLlibTestCase):
0264
0265 def test_als_ratings_serialize(self):
0266 ser = PickleSerializer()
0267 r = Rating(7, 1123, 3.14)
0268 jr = self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(bytearray(ser.dumps(r)))
0269 nr = ser.loads(bytes(self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(jr)))
0270 self.assertEqual(r.user, nr.user)
0271 self.assertEqual(r.product, nr.product)
0272 self.assertAlmostEqual(r.rating, nr.rating, 2)
0273
0274 def test_als_ratings_id_long_error(self):
0275 ser = PickleSerializer()
0276 r = Rating(1205640308657491975, 50233468418, 1.0)
0277
0278 self.assertRaises(Py4JJavaError, self.sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads,
0279 bytearray(ser.dumps(r)))
0280
0281
0282 class FPGrowthTest(MLlibTestCase):
0283
0284 def test_fpgrowth(self):
0285 data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
0286 rdd = self.sc.parallelize(data, 2)
0287 model1 = FPGrowth.train(rdd, 0.6, 2)
0288
0289 model2 = FPGrowth.train(rdd, 0.6)
0290 self.assertEqual(sorted(model1.freqItemsets().collect()),
0291 sorted(model2.freqItemsets().collect()))
0292
0293
0294 if __name__ == "__main__":
0295 from pyspark.mllib.tests.test_algorithms import *
0296
0297 try:
0298 import xmlrunner
0299 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0300 except ImportError:
0301 testRunner = None
0302 unittest.main(testRunner=testRunner, verbosity=2)