0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from shutil import rmtree
0019 import tempfile
0020 import unittest
0021
0022 import numpy as np
0023
0024 from pyspark.ml.classification import FMClassifier, LogisticRegression, \
0025 MultilayerPerceptronClassifier, OneVsRest
0026 from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel
0027 from pyspark.ml.fpm import FPGrowth
0028 from pyspark.ml.linalg import Matrices, Vectors
0029 from pyspark.ml.recommendation import ALS
0030 from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
0031 from pyspark.sql import Row
0032 from pyspark.testing.mlutils import SparkSessionTestCase
0033
0034
0035 class LogisticRegressionTest(SparkSessionTestCase):
0036
0037 def test_binomial_logistic_regression_with_bound(self):
0038
0039 df = self.spark.createDataFrame(
0040 [(1.0, 1.0, Vectors.dense(0.0, 5.0)),
0041 (0.0, 2.0, Vectors.dense(1.0, 2.0)),
0042 (1.0, 3.0, Vectors.dense(2.0, 1.0)),
0043 (0.0, 4.0, Vectors.dense(3.0, 3.0)), ], ["label", "weight", "features"])
0044
0045 lor = LogisticRegression(regParam=0.01, weightCol="weight",
0046 lowerBoundsOnCoefficients=Matrices.dense(1, 2, [-1.0, -1.0]),
0047 upperBoundsOnIntercepts=Vectors.dense(0.0))
0048 model = lor.fit(df)
0049 self.assertTrue(
0050 np.allclose(model.coefficients.toArray(), [-0.2944, -0.0484], atol=1E-4))
0051 self.assertTrue(np.isclose(model.intercept, 0.0, atol=1E-4))
0052
0053 def test_multinomial_logistic_regression_with_bound(self):
0054
0055 data_path = "data/mllib/sample_multiclass_classification_data.txt"
0056 df = self.spark.read.format("libsvm").load(data_path)
0057
0058 lor = LogisticRegression(regParam=0.01,
0059 lowerBoundsOnCoefficients=Matrices.dense(3, 4, range(12)),
0060 upperBoundsOnIntercepts=Vectors.dense(0.0, 0.0, 0.0))
0061 model = lor.fit(df)
0062 expected = [[4.593, 4.5516, 9.0099, 12.2904],
0063 [1.0, 8.1093, 7.0, 10.0],
0064 [3.041, 5.0, 8.0, 11.0]]
0065 for i in range(0, len(expected)):
0066 self.assertTrue(
0067 np.allclose(model.coefficientMatrix.toArray()[i], expected[i], atol=1E-4))
0068 self.assertTrue(
0069 np.allclose(model.interceptVector.toArray(), [-0.9057, -1.1392, -0.0033], atol=1E-4))
0070
0071
0072 class MultilayerPerceptronClassifierTest(SparkSessionTestCase):
0073
0074 def test_raw_and_probability_prediction(self):
0075
0076 data_path = "data/mllib/sample_multiclass_classification_data.txt"
0077 df = self.spark.read.format("libsvm").load(data_path)
0078
0079 mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[4, 5, 4, 3],
0080 blockSize=128, seed=123)
0081 model = mlp.fit(df)
0082 test = self.sc.parallelize([Row(features=Vectors.dense(0.1, 0.1, 0.25, 0.25))]).toDF()
0083 result = model.transform(test).head()
0084 expected_prediction = 2.0
0085 expected_probability = [0.0, 0.0, 1.0]
0086 expected_rawPrediction = [-11.6081922998, -8.15827998691, 22.17757045]
0087 self.assertTrue(result.prediction, expected_prediction)
0088 self.assertTrue(np.allclose(result.probability, expected_probability, atol=1E-4))
0089 self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, atol=1))
0090
0091
0092 class OneVsRestTests(SparkSessionTestCase):
0093
0094 def test_copy(self):
0095 df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
0096 (1.0, Vectors.sparse(2, [], [])),
0097 (2.0, Vectors.dense(0.5, 0.5))],
0098 ["label", "features"])
0099 lr = LogisticRegression(maxIter=5, regParam=0.01)
0100 ovr = OneVsRest(classifier=lr)
0101 ovr1 = ovr.copy({lr.maxIter: 10})
0102 self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
0103 self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
0104 model = ovr.fit(df)
0105 model1 = model.copy({model.predictionCol: "indexed"})
0106 self.assertEqual(model1.getPredictionCol(), "indexed")
0107
0108 def test_output_columns(self):
0109 df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
0110 (1.0, Vectors.sparse(2, [], [])),
0111 (2.0, Vectors.dense(0.5, 0.5))],
0112 ["label", "features"])
0113 lr = LogisticRegression(maxIter=5, regParam=0.01)
0114 ovr = OneVsRest(classifier=lr, parallelism=1)
0115 model = ovr.fit(df)
0116 output = model.transform(df)
0117 self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"])
0118
0119 def test_parallelism_doesnt_change_output(self):
0120 df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
0121 (1.0, Vectors.sparse(2, [], [])),
0122 (2.0, Vectors.dense(0.5, 0.5))],
0123 ["label", "features"])
0124 ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
0125 modelPar1 = ovrPar1.fit(df)
0126 ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
0127 modelPar2 = ovrPar2.fit(df)
0128 for i, model in enumerate(modelPar1.models):
0129 self.assertTrue(np.allclose(model.coefficients.toArray(),
0130 modelPar2.models[i].coefficients.toArray(), atol=1E-4))
0131 self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))
0132
0133 def test_support_for_weightCol(self):
0134 df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
0135 (1.0, Vectors.sparse(2, [], []), 1.0),
0136 (2.0, Vectors.dense(0.5, 0.5), 1.0)],
0137 ["label", "features", "weight"])
0138
0139 lr = LogisticRegression(maxIter=5, regParam=0.01)
0140 ovr = OneVsRest(classifier=lr, weightCol="weight")
0141 self.assertIsNotNone(ovr.fit(df))
0142
0143 dt = FMClassifier()
0144 ovr2 = OneVsRest(classifier=dt, weightCol="weight")
0145 self.assertIsNotNone(ovr2.fit(df))
0146
0147
0148 class KMeansTests(SparkSessionTestCase):
0149
0150 def test_kmeans_cosine_distance(self):
0151 data = [(Vectors.dense([1.0, 1.0]),), (Vectors.dense([10.0, 10.0]),),
0152 (Vectors.dense([1.0, 0.5]),), (Vectors.dense([10.0, 4.4]),),
0153 (Vectors.dense([-1.0, 1.0]),), (Vectors.dense([-100.0, 90.0]),)]
0154 df = self.spark.createDataFrame(data, ["features"])
0155 kmeans = KMeans(k=3, seed=1, distanceMeasure="cosine")
0156 model = kmeans.fit(df)
0157 result = model.transform(df).collect()
0158 self.assertTrue(result[0].prediction == result[1].prediction)
0159 self.assertTrue(result[2].prediction == result[3].prediction)
0160 self.assertTrue(result[4].prediction == result[5].prediction)
0161
0162
0163 class LDATest(SparkSessionTestCase):
0164
0165 def _compare(self, m1, m2):
0166 """
0167 Temp method for comparing instances.
0168 TODO: Replace with generic implementation once SPARK-14706 is merged.
0169 """
0170 self.assertEqual(m1.uid, m2.uid)
0171 self.assertEqual(type(m1), type(m2))
0172 self.assertEqual(len(m1.params), len(m2.params))
0173 for p in m1.params:
0174 if m1.isDefined(p):
0175 self.assertEqual(m1.getOrDefault(p), m2.getOrDefault(p))
0176 self.assertEqual(p.parent, m2.getParam(p.name).parent)
0177 if isinstance(m1, LDAModel):
0178 self.assertEqual(m1.vocabSize(), m2.vocabSize())
0179 self.assertEqual(m1.topicsMatrix(), m2.topicsMatrix())
0180
0181 def test_persistence(self):
0182
0183 df = self.spark.createDataFrame([
0184 [1, Vectors.dense([0.0, 1.0])],
0185 [2, Vectors.sparse(2, {0: 1.0})],
0186 ], ["id", "features"])
0187
0188 lda = LDA(k=2, seed=1, optimizer="em")
0189 distributedModel = lda.fit(df)
0190 self.assertTrue(distributedModel.isDistributed())
0191 localModel = distributedModel.toLocal()
0192 self.assertFalse(localModel.isDistributed())
0193
0194 path = tempfile.mkdtemp()
0195 lda_path = path + "/lda"
0196 dist_model_path = path + "/distLDAModel"
0197 local_model_path = path + "/localLDAModel"
0198
0199 lda.save(lda_path)
0200 lda2 = LDA.load(lda_path)
0201 self._compare(lda, lda2)
0202
0203 distributedModel.save(dist_model_path)
0204 distributedModel2 = DistributedLDAModel.load(dist_model_path)
0205 self._compare(distributedModel, distributedModel2)
0206
0207 localModel.save(local_model_path)
0208 localModel2 = LocalLDAModel.load(local_model_path)
0209 self._compare(localModel, localModel2)
0210
0211 try:
0212 rmtree(path)
0213 except OSError:
0214 pass
0215
0216
0217 class FPGrowthTests(SparkSessionTestCase):
0218 def setUp(self):
0219 super(FPGrowthTests, self).setUp()
0220 self.data = self.spark.createDataFrame(
0221 [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )],
0222 ["items"])
0223
0224 def test_association_rules(self):
0225 fp = FPGrowth()
0226 fpm = fp.fit(self.data)
0227
0228 expected_association_rules = self.spark.createDataFrame(
0229 [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)],
0230 ["antecedent", "consequent", "confidence", "lift"]
0231 )
0232 actual_association_rules = fpm.associationRules
0233
0234 self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0)
0235 self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0)
0236
0237 def test_freq_itemsets(self):
0238 fp = FPGrowth()
0239 fpm = fp.fit(self.data)
0240
0241 expected_freq_itemsets = self.spark.createDataFrame(
0242 [([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)],
0243 ["items", "freq"]
0244 )
0245 actual_freq_itemsets = fpm.freqItemsets
0246
0247 self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0)
0248 self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0)
0249
0250 def tearDown(self):
0251 del self.data
0252
0253
0254 class ALSTest(SparkSessionTestCase):
0255
0256 def test_storage_levels(self):
0257 df = self.spark.createDataFrame(
0258 [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
0259 ["user", "item", "rating"])
0260 als = ALS().setMaxIter(1).setRank(1)
0261
0262 als.fit(df)
0263 self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
0264 self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
0265 self.assertEqual(als.getFinalStorageLevel(), "MEMORY_AND_DISK")
0266 self.assertEqual(als._java_obj.getFinalStorageLevel(), "MEMORY_AND_DISK")
0267
0268 als.setIntermediateStorageLevel("MEMORY_ONLY_2")
0269 als.setFinalStorageLevel("DISK_ONLY")
0270 als.fit(df)
0271 self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
0272 self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
0273 self.assertEqual(als.getFinalStorageLevel(), "DISK_ONLY")
0274 self.assertEqual(als._java_obj.getFinalStorageLevel(), "DISK_ONLY")
0275
0276
0277 class GeneralizedLinearRegressionTest(SparkSessionTestCase):
0278
0279 def test_tweedie_distribution(self):
0280
0281 df = self.spark.createDataFrame(
0282 [(1.0, Vectors.dense(0.0, 0.0)),
0283 (1.0, Vectors.dense(1.0, 2.0)),
0284 (2.0, Vectors.dense(0.0, 0.0)),
0285 (2.0, Vectors.dense(1.0, 1.0)), ], ["label", "features"])
0286
0287 glr = GeneralizedLinearRegression(family="tweedie", variancePower=1.6)
0288 model = glr.fit(df)
0289 self.assertTrue(np.allclose(model.coefficients.toArray(), [-0.4645, 0.3402], atol=1E-4))
0290 self.assertTrue(np.isclose(model.intercept, 0.7841, atol=1E-4))
0291
0292 model2 = glr.setLinkPower(-1.0).fit(df)
0293 self.assertTrue(np.allclose(model2.coefficients.toArray(), [-0.6667, 0.5], atol=1E-4))
0294 self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4))
0295
0296 def test_offset(self):
0297
0298 df = self.spark.createDataFrame(
0299 [(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0)),
0300 (0.5, 2.1, 0.5, Vectors.dense(1.0, 2.0)),
0301 (0.9, 0.4, 1.0, Vectors.dense(2.0, 1.0)),
0302 (0.7, 0.7, 0.0, Vectors.dense(3.0, 3.0))], ["label", "weight", "offset", "features"])
0303
0304 glr = GeneralizedLinearRegression(family="poisson", weightCol="weight", offsetCol="offset")
0305 model = glr.fit(df)
0306 self.assertTrue(np.allclose(model.coefficients.toArray(), [0.664647, -0.3192581],
0307 atol=1E-4))
0308 self.assertTrue(np.isclose(model.intercept, -1.561613, atol=1E-4))
0309
0310
0311 class LinearRegressionTest(SparkSessionTestCase):
0312
0313 def test_linear_regression_with_huber_loss(self):
0314
0315 data_path = "data/mllib/sample_linear_regression_data.txt"
0316 df = self.spark.read.format("libsvm").load(data_path)
0317
0318 lir = LinearRegression(loss="huber", epsilon=2.0)
0319 model = lir.fit(df)
0320
0321 expectedCoefficients = [0.136, 0.7648, -0.7761, 2.4236, 0.537,
0322 1.2612, -0.333, -0.5694, -0.6311, 0.6053]
0323 expectedIntercept = 0.1607
0324 expectedScale = 9.758
0325
0326 self.assertTrue(
0327 np.allclose(model.coefficients.toArray(), expectedCoefficients, atol=1E-3))
0328 self.assertTrue(np.isclose(model.intercept, expectedIntercept, atol=1E-3))
0329 self.assertTrue(np.isclose(model.scale, expectedScale, atol=1E-3))
0330
0331
0332 if __name__ == "__main__":
0333 from pyspark.ml.tests.test_algorithms import *
0334
0335 try:
0336 import xmlrunner
0337 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0338 except ImportError:
0339 testRunner = None
0340 unittest.main(testRunner=testRunner, verbosity=2)