Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from shutil import rmtree
0019 import tempfile
0020 import unittest
0021 
0022 import numpy as np
0023 
0024 from pyspark.ml.classification import FMClassifier, LogisticRegression, \
0025     MultilayerPerceptronClassifier, OneVsRest
0026 from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel
0027 from pyspark.ml.fpm import FPGrowth
0028 from pyspark.ml.linalg import Matrices, Vectors
0029 from pyspark.ml.recommendation import ALS
0030 from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
0031 from pyspark.sql import Row
0032 from pyspark.testing.mlutils import SparkSessionTestCase
0033 
0034 
0035 class LogisticRegressionTest(SparkSessionTestCase):
0036 
0037     def test_binomial_logistic_regression_with_bound(self):
0038 
0039         df = self.spark.createDataFrame(
0040             [(1.0, 1.0, Vectors.dense(0.0, 5.0)),
0041              (0.0, 2.0, Vectors.dense(1.0, 2.0)),
0042              (1.0, 3.0, Vectors.dense(2.0, 1.0)),
0043              (0.0, 4.0, Vectors.dense(3.0, 3.0)), ], ["label", "weight", "features"])
0044 
0045         lor = LogisticRegression(regParam=0.01, weightCol="weight",
0046                                  lowerBoundsOnCoefficients=Matrices.dense(1, 2, [-1.0, -1.0]),
0047                                  upperBoundsOnIntercepts=Vectors.dense(0.0))
0048         model = lor.fit(df)
0049         self.assertTrue(
0050             np.allclose(model.coefficients.toArray(), [-0.2944, -0.0484], atol=1E-4))
0051         self.assertTrue(np.isclose(model.intercept, 0.0, atol=1E-4))
0052 
0053     def test_multinomial_logistic_regression_with_bound(self):
0054 
0055         data_path = "data/mllib/sample_multiclass_classification_data.txt"
0056         df = self.spark.read.format("libsvm").load(data_path)
0057 
0058         lor = LogisticRegression(regParam=0.01,
0059                                  lowerBoundsOnCoefficients=Matrices.dense(3, 4, range(12)),
0060                                  upperBoundsOnIntercepts=Vectors.dense(0.0, 0.0, 0.0))
0061         model = lor.fit(df)
0062         expected = [[4.593, 4.5516, 9.0099, 12.2904],
0063                     [1.0, 8.1093, 7.0, 10.0],
0064                     [3.041, 5.0, 8.0, 11.0]]
0065         for i in range(0, len(expected)):
0066             self.assertTrue(
0067                 np.allclose(model.coefficientMatrix.toArray()[i], expected[i], atol=1E-4))
0068         self.assertTrue(
0069             np.allclose(model.interceptVector.toArray(), [-0.9057, -1.1392, -0.0033], atol=1E-4))
0070 
0071 
0072 class MultilayerPerceptronClassifierTest(SparkSessionTestCase):
0073 
0074     def test_raw_and_probability_prediction(self):
0075 
0076         data_path = "data/mllib/sample_multiclass_classification_data.txt"
0077         df = self.spark.read.format("libsvm").load(data_path)
0078 
0079         mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[4, 5, 4, 3],
0080                                              blockSize=128, seed=123)
0081         model = mlp.fit(df)
0082         test = self.sc.parallelize([Row(features=Vectors.dense(0.1, 0.1, 0.25, 0.25))]).toDF()
0083         result = model.transform(test).head()
0084         expected_prediction = 2.0
0085         expected_probability = [0.0, 0.0, 1.0]
0086         expected_rawPrediction = [-11.6081922998, -8.15827998691, 22.17757045]
0087         self.assertTrue(result.prediction, expected_prediction)
0088         self.assertTrue(np.allclose(result.probability, expected_probability, atol=1E-4))
0089         self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, atol=1))
0090 
0091 
0092 class OneVsRestTests(SparkSessionTestCase):
0093 
0094     def test_copy(self):
0095         df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
0096                                          (1.0, Vectors.sparse(2, [], [])),
0097                                          (2.0, Vectors.dense(0.5, 0.5))],
0098                                         ["label", "features"])
0099         lr = LogisticRegression(maxIter=5, regParam=0.01)
0100         ovr = OneVsRest(classifier=lr)
0101         ovr1 = ovr.copy({lr.maxIter: 10})
0102         self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
0103         self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
0104         model = ovr.fit(df)
0105         model1 = model.copy({model.predictionCol: "indexed"})
0106         self.assertEqual(model1.getPredictionCol(), "indexed")
0107 
0108     def test_output_columns(self):
0109         df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
0110                                          (1.0, Vectors.sparse(2, [], [])),
0111                                          (2.0, Vectors.dense(0.5, 0.5))],
0112                                         ["label", "features"])
0113         lr = LogisticRegression(maxIter=5, regParam=0.01)
0114         ovr = OneVsRest(classifier=lr, parallelism=1)
0115         model = ovr.fit(df)
0116         output = model.transform(df)
0117         self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"])
0118 
0119     def test_parallelism_doesnt_change_output(self):
0120         df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
0121                                          (1.0, Vectors.sparse(2, [], [])),
0122                                          (2.0, Vectors.dense(0.5, 0.5))],
0123                                         ["label", "features"])
0124         ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
0125         modelPar1 = ovrPar1.fit(df)
0126         ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
0127         modelPar2 = ovrPar2.fit(df)
0128         for i, model in enumerate(modelPar1.models):
0129             self.assertTrue(np.allclose(model.coefficients.toArray(),
0130                                         modelPar2.models[i].coefficients.toArray(), atol=1E-4))
0131             self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))
0132 
0133     def test_support_for_weightCol(self):
0134         df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
0135                                          (1.0, Vectors.sparse(2, [], []), 1.0),
0136                                          (2.0, Vectors.dense(0.5, 0.5), 1.0)],
0137                                         ["label", "features", "weight"])
0138         # classifier inherits hasWeightCol
0139         lr = LogisticRegression(maxIter=5, regParam=0.01)
0140         ovr = OneVsRest(classifier=lr, weightCol="weight")
0141         self.assertIsNotNone(ovr.fit(df))
0142         # classifier doesn't inherit hasWeightCol
0143         dt = FMClassifier()
0144         ovr2 = OneVsRest(classifier=dt, weightCol="weight")
0145         self.assertIsNotNone(ovr2.fit(df))
0146 
0147 
0148 class KMeansTests(SparkSessionTestCase):
0149 
0150     def test_kmeans_cosine_distance(self):
0151         data = [(Vectors.dense([1.0, 1.0]),), (Vectors.dense([10.0, 10.0]),),
0152                 (Vectors.dense([1.0, 0.5]),), (Vectors.dense([10.0, 4.4]),),
0153                 (Vectors.dense([-1.0, 1.0]),), (Vectors.dense([-100.0, 90.0]),)]
0154         df = self.spark.createDataFrame(data, ["features"])
0155         kmeans = KMeans(k=3, seed=1, distanceMeasure="cosine")
0156         model = kmeans.fit(df)
0157         result = model.transform(df).collect()
0158         self.assertTrue(result[0].prediction == result[1].prediction)
0159         self.assertTrue(result[2].prediction == result[3].prediction)
0160         self.assertTrue(result[4].prediction == result[5].prediction)
0161 
0162 
0163 class LDATest(SparkSessionTestCase):
0164 
0165     def _compare(self, m1, m2):
0166         """
0167         Temp method for comparing instances.
0168         TODO: Replace with generic implementation once SPARK-14706 is merged.
0169         """
0170         self.assertEqual(m1.uid, m2.uid)
0171         self.assertEqual(type(m1), type(m2))
0172         self.assertEqual(len(m1.params), len(m2.params))
0173         for p in m1.params:
0174             if m1.isDefined(p):
0175                 self.assertEqual(m1.getOrDefault(p), m2.getOrDefault(p))
0176                 self.assertEqual(p.parent, m2.getParam(p.name).parent)
0177         if isinstance(m1, LDAModel):
0178             self.assertEqual(m1.vocabSize(), m2.vocabSize())
0179             self.assertEqual(m1.topicsMatrix(), m2.topicsMatrix())
0180 
0181     def test_persistence(self):
0182         # Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
0183         df = self.spark.createDataFrame([
0184             [1, Vectors.dense([0.0, 1.0])],
0185             [2, Vectors.sparse(2, {0: 1.0})],
0186         ], ["id", "features"])
0187         # Fit model
0188         lda = LDA(k=2, seed=1, optimizer="em")
0189         distributedModel = lda.fit(df)
0190         self.assertTrue(distributedModel.isDistributed())
0191         localModel = distributedModel.toLocal()
0192         self.assertFalse(localModel.isDistributed())
0193         # Define paths
0194         path = tempfile.mkdtemp()
0195         lda_path = path + "/lda"
0196         dist_model_path = path + "/distLDAModel"
0197         local_model_path = path + "/localLDAModel"
0198         # Test LDA
0199         lda.save(lda_path)
0200         lda2 = LDA.load(lda_path)
0201         self._compare(lda, lda2)
0202         # Test DistributedLDAModel
0203         distributedModel.save(dist_model_path)
0204         distributedModel2 = DistributedLDAModel.load(dist_model_path)
0205         self._compare(distributedModel, distributedModel2)
0206         # Test LocalLDAModel
0207         localModel.save(local_model_path)
0208         localModel2 = LocalLDAModel.load(local_model_path)
0209         self._compare(localModel, localModel2)
0210         # Clean up
0211         try:
0212             rmtree(path)
0213         except OSError:
0214             pass
0215 
0216 
0217 class FPGrowthTests(SparkSessionTestCase):
0218     def setUp(self):
0219         super(FPGrowthTests, self).setUp()
0220         self.data = self.spark.createDataFrame(
0221             [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )],
0222             ["items"])
0223 
0224     def test_association_rules(self):
0225         fp = FPGrowth()
0226         fpm = fp.fit(self.data)
0227 
0228         expected_association_rules = self.spark.createDataFrame(
0229             [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)],
0230             ["antecedent", "consequent", "confidence", "lift"]
0231         )
0232         actual_association_rules = fpm.associationRules
0233 
0234         self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0)
0235         self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0)
0236 
0237     def test_freq_itemsets(self):
0238         fp = FPGrowth()
0239         fpm = fp.fit(self.data)
0240 
0241         expected_freq_itemsets = self.spark.createDataFrame(
0242             [([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)],
0243             ["items", "freq"]
0244         )
0245         actual_freq_itemsets = fpm.freqItemsets
0246 
0247         self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0)
0248         self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0)
0249 
0250     def tearDown(self):
0251         del self.data
0252 
0253 
0254 class ALSTest(SparkSessionTestCase):
0255 
0256     def test_storage_levels(self):
0257         df = self.spark.createDataFrame(
0258             [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
0259             ["user", "item", "rating"])
0260         als = ALS().setMaxIter(1).setRank(1)
0261         # test default params
0262         als.fit(df)
0263         self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
0264         self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
0265         self.assertEqual(als.getFinalStorageLevel(), "MEMORY_AND_DISK")
0266         self.assertEqual(als._java_obj.getFinalStorageLevel(), "MEMORY_AND_DISK")
0267         # test non-default params
0268         als.setIntermediateStorageLevel("MEMORY_ONLY_2")
0269         als.setFinalStorageLevel("DISK_ONLY")
0270         als.fit(df)
0271         self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
0272         self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
0273         self.assertEqual(als.getFinalStorageLevel(), "DISK_ONLY")
0274         self.assertEqual(als._java_obj.getFinalStorageLevel(), "DISK_ONLY")
0275 
0276 
0277 class GeneralizedLinearRegressionTest(SparkSessionTestCase):
0278 
0279     def test_tweedie_distribution(self):
0280 
0281         df = self.spark.createDataFrame(
0282             [(1.0, Vectors.dense(0.0, 0.0)),
0283              (1.0, Vectors.dense(1.0, 2.0)),
0284              (2.0, Vectors.dense(0.0, 0.0)),
0285              (2.0, Vectors.dense(1.0, 1.0)), ], ["label", "features"])
0286 
0287         glr = GeneralizedLinearRegression(family="tweedie", variancePower=1.6)
0288         model = glr.fit(df)
0289         self.assertTrue(np.allclose(model.coefficients.toArray(), [-0.4645, 0.3402], atol=1E-4))
0290         self.assertTrue(np.isclose(model.intercept, 0.7841, atol=1E-4))
0291 
0292         model2 = glr.setLinkPower(-1.0).fit(df)
0293         self.assertTrue(np.allclose(model2.coefficients.toArray(), [-0.6667, 0.5], atol=1E-4))
0294         self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4))
0295 
0296     def test_offset(self):
0297 
0298         df = self.spark.createDataFrame(
0299             [(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0)),
0300              (0.5, 2.1, 0.5, Vectors.dense(1.0, 2.0)),
0301              (0.9, 0.4, 1.0, Vectors.dense(2.0, 1.0)),
0302              (0.7, 0.7, 0.0, Vectors.dense(3.0, 3.0))], ["label", "weight", "offset", "features"])
0303 
0304         glr = GeneralizedLinearRegression(family="poisson", weightCol="weight", offsetCol="offset")
0305         model = glr.fit(df)
0306         self.assertTrue(np.allclose(model.coefficients.toArray(), [0.664647, -0.3192581],
0307                                     atol=1E-4))
0308         self.assertTrue(np.isclose(model.intercept, -1.561613, atol=1E-4))
0309 
0310 
0311 class LinearRegressionTest(SparkSessionTestCase):
0312 
0313     def test_linear_regression_with_huber_loss(self):
0314 
0315         data_path = "data/mllib/sample_linear_regression_data.txt"
0316         df = self.spark.read.format("libsvm").load(data_path)
0317 
0318         lir = LinearRegression(loss="huber", epsilon=2.0)
0319         model = lir.fit(df)
0320 
0321         expectedCoefficients = [0.136, 0.7648, -0.7761, 2.4236, 0.537,
0322                                 1.2612, -0.333, -0.5694, -0.6311, 0.6053]
0323         expectedIntercept = 0.1607
0324         expectedScale = 9.758
0325 
0326         self.assertTrue(
0327             np.allclose(model.coefficients.toArray(), expectedCoefficients, atol=1E-3))
0328         self.assertTrue(np.isclose(model.intercept, expectedIntercept, atol=1E-3))
0329         self.assertTrue(np.isclose(model.scale, expectedScale, atol=1E-3))
0330 
0331 
0332 if __name__ == "__main__":
0333     from pyspark.ml.tests.test_algorithms import *
0334 
0335     try:
0336         import xmlrunner
0337         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0338     except ImportError:
0339         testRunner = None
0340     unittest.main(testRunner=testRunner, verbosity=2)