Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import tempfile
0019 import unittest
0020 
0021 from pyspark.ml.feature import HashingTF, Tokenizer
0022 from pyspark.ml import Estimator, Pipeline, Model
0023 from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
0024 from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
0025     MulticlassClassificationEvaluator, RegressionEvaluator
0026 from pyspark.ml.linalg import Vectors
0027 from pyspark.ml.param import Param, Params
0028 from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
0029     TrainValidationSplit, TrainValidationSplitModel
0030 from pyspark.sql.functions import rand
0031 from pyspark.testing.mlutils import SparkSessionTestCase
0032 
0033 
0034 class HasInducedError(Params):
0035 
0036     def __init__(self):
0037         super(HasInducedError, self).__init__()
0038         self.inducedError = Param(self, "inducedError",
0039                                   "Uniformly-distributed error added to feature")
0040 
0041     def getInducedError(self):
0042         return self.getOrDefault(self.inducedError)
0043 
0044 
0045 class InducedErrorModel(Model, HasInducedError):
0046 
0047     def __init__(self):
0048         super(InducedErrorModel, self).__init__()
0049 
0050     def _transform(self, dataset):
0051         return dataset.withColumn("prediction",
0052                                   dataset.feature + (rand(0) * self.getInducedError()))
0053 
0054 
0055 class InducedErrorEstimator(Estimator, HasInducedError):
0056 
0057     def __init__(self, inducedError=1.0):
0058         super(InducedErrorEstimator, self).__init__()
0059         self._set(inducedError=inducedError)
0060 
0061     def _fit(self, dataset):
0062         model = InducedErrorModel()
0063         self._copyValues(model)
0064         return model
0065 
0066 
0067 class ParamGridBuilderTests(SparkSessionTestCase):
0068 
0069     def test_addGrid(self):
0070         with self.assertRaises(TypeError):
0071             grid = (ParamGridBuilder()
0072                     .addGrid("must be an instance of Param", ["not", "string"])
0073                     .build())
0074 
0075 
0076 class CrossValidatorTests(SparkSessionTestCase):
0077 
0078     def test_copy(self):
0079         dataset = self.spark.createDataFrame([
0080             (10, 10.0),
0081             (50, 50.0),
0082             (100, 100.0),
0083             (500, 500.0)] * 10,
0084             ["feature", "label"])
0085 
0086         iee = InducedErrorEstimator()
0087         evaluator = RegressionEvaluator(metricName="rmse")
0088 
0089         grid = (ParamGridBuilder()
0090                 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
0091                 .build())
0092         cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0093         cvCopied = cv.copy()
0094         self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid)
0095 
0096         cvModel = cv.fit(dataset)
0097         cvModelCopied = cvModel.copy()
0098         for index in range(len(cvModel.avgMetrics)):
0099             self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
0100                             < 0.0001)
0101 
0102     def test_fit_minimize_metric(self):
0103         dataset = self.spark.createDataFrame([
0104             (10, 10.0),
0105             (50, 50.0),
0106             (100, 100.0),
0107             (500, 500.0)] * 10,
0108             ["feature", "label"])
0109 
0110         iee = InducedErrorEstimator()
0111         evaluator = RegressionEvaluator(metricName="rmse")
0112 
0113         grid = (ParamGridBuilder()
0114                 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
0115                 .build())
0116         cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0117         cvModel = cv.fit(dataset)
0118         bestModel = cvModel.bestModel
0119         bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0120 
0121         self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0122                          "Best model should have zero induced error")
0123         self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
0124 
0125     def test_fit_maximize_metric(self):
0126         dataset = self.spark.createDataFrame([
0127             (10, 10.0),
0128             (50, 50.0),
0129             (100, 100.0),
0130             (500, 500.0)] * 10,
0131             ["feature", "label"])
0132 
0133         iee = InducedErrorEstimator()
0134         evaluator = RegressionEvaluator(metricName="r2")
0135 
0136         grid = (ParamGridBuilder()
0137                 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
0138                 .build())
0139         cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0140         cvModel = cv.fit(dataset)
0141         bestModel = cvModel.bestModel
0142         bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0143 
0144         self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0145                          "Best model should have zero induced error")
0146         self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
0147 
0148     def test_param_grid_type_coercion(self):
0149         lr = LogisticRegression(maxIter=10)
0150         paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.5, 1]).build()
0151         for param in paramGrid:
0152             for v in param.values():
0153                 assert(type(v) == float)
0154 
0155     def test_save_load_trained_model(self):
0156         # This tests saving and loading the trained model only.
0157         # Save/load for CrossValidator will be added later: SPARK-13786
0158         temp_path = tempfile.mkdtemp()
0159         dataset = self.spark.createDataFrame(
0160             [(Vectors.dense([0.0]), 0.0),
0161              (Vectors.dense([0.4]), 1.0),
0162              (Vectors.dense([0.5]), 0.0),
0163              (Vectors.dense([0.6]), 1.0),
0164              (Vectors.dense([1.0]), 1.0)] * 10,
0165             ["features", "label"])
0166         lr = LogisticRegression()
0167         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0168         evaluator = BinaryClassificationEvaluator()
0169         cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0170         cvModel = cv.fit(dataset)
0171         lrModel = cvModel.bestModel
0172 
0173         cvModelPath = temp_path + "/cvModel"
0174         lrModel.save(cvModelPath)
0175         loadedLrModel = LogisticRegressionModel.load(cvModelPath)
0176         self.assertEqual(loadedLrModel.uid, lrModel.uid)
0177         self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
0178 
0179     def test_save_load_simple_estimator(self):
0180         temp_path = tempfile.mkdtemp()
0181         dataset = self.spark.createDataFrame(
0182             [(Vectors.dense([0.0]), 0.0),
0183              (Vectors.dense([0.4]), 1.0),
0184              (Vectors.dense([0.5]), 0.0),
0185              (Vectors.dense([0.6]), 1.0),
0186              (Vectors.dense([1.0]), 1.0)] * 10,
0187             ["features", "label"])
0188 
0189         lr = LogisticRegression()
0190         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0191         evaluator = BinaryClassificationEvaluator()
0192 
0193         # test save/load of CrossValidator
0194         cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0195         cvModel = cv.fit(dataset)
0196         cvPath = temp_path + "/cv"
0197         cv.save(cvPath)
0198         loadedCV = CrossValidator.load(cvPath)
0199         self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
0200         self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
0201         self.assertEqual(loadedCV.getEstimatorParamMaps(), cv.getEstimatorParamMaps())
0202 
0203         # test save/load of CrossValidatorModel
0204         cvModelPath = temp_path + "/cvModel"
0205         cvModel.save(cvModelPath)
0206         loadedModel = CrossValidatorModel.load(cvModelPath)
0207         self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
0208 
0209     def test_parallel_evaluation(self):
0210         dataset = self.spark.createDataFrame(
0211             [(Vectors.dense([0.0]), 0.0),
0212              (Vectors.dense([0.4]), 1.0),
0213              (Vectors.dense([0.5]), 0.0),
0214              (Vectors.dense([0.6]), 1.0),
0215              (Vectors.dense([1.0]), 1.0)] * 10,
0216             ["features", "label"])
0217 
0218         lr = LogisticRegression()
0219         grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
0220         evaluator = BinaryClassificationEvaluator()
0221 
0222         # test save/load of CrossValidator
0223         cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0224         cv.setParallelism(1)
0225         cvSerialModel = cv.fit(dataset)
0226         cv.setParallelism(2)
0227         cvParallelModel = cv.fit(dataset)
0228         self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
0229 
0230     def test_expose_sub_models(self):
0231         temp_path = tempfile.mkdtemp()
0232         dataset = self.spark.createDataFrame(
0233             [(Vectors.dense([0.0]), 0.0),
0234              (Vectors.dense([0.4]), 1.0),
0235              (Vectors.dense([0.5]), 0.0),
0236              (Vectors.dense([0.6]), 1.0),
0237              (Vectors.dense([1.0]), 1.0)] * 10,
0238             ["features", "label"])
0239 
0240         lr = LogisticRegression()
0241         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0242         evaluator = BinaryClassificationEvaluator()
0243 
0244         numFolds = 3
0245         cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
0246                             numFolds=numFolds, collectSubModels=True)
0247 
0248         def checkSubModels(subModels):
0249             self.assertEqual(len(subModels), numFolds)
0250             for i in range(numFolds):
0251                 self.assertEqual(len(subModels[i]), len(grid))
0252 
0253         cvModel = cv.fit(dataset)
0254         checkSubModels(cvModel.subModels)
0255 
0256         # Test the default value for option "persistSubModel" to be "true"
0257         testSubPath = temp_path + "/testCrossValidatorSubModels"
0258         savingPathWithSubModels = testSubPath + "cvModel3"
0259         cvModel.save(savingPathWithSubModels)
0260         cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
0261         checkSubModels(cvModel3.subModels)
0262         cvModel4 = cvModel3.copy()
0263         checkSubModels(cvModel4.subModels)
0264 
0265         savingPathWithoutSubModels = testSubPath + "cvModel2"
0266         cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
0267         cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
0268         self.assertEqual(cvModel2.subModels, None)
0269 
0270         for i in range(numFolds):
0271             for j in range(len(grid)):
0272                 self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)
0273 
0274     def test_save_load_nested_estimator(self):
0275         temp_path = tempfile.mkdtemp()
0276         dataset = self.spark.createDataFrame(
0277             [(Vectors.dense([0.0]), 0.0),
0278              (Vectors.dense([0.4]), 1.0),
0279              (Vectors.dense([0.5]), 0.0),
0280              (Vectors.dense([0.6]), 1.0),
0281              (Vectors.dense([1.0]), 1.0)] * 10,
0282             ["features", "label"])
0283 
0284         ova = OneVsRest(classifier=LogisticRegression())
0285         lr1 = LogisticRegression().setMaxIter(100)
0286         lr2 = LogisticRegression().setMaxIter(150)
0287         grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
0288         evaluator = MulticlassClassificationEvaluator()
0289 
0290         # test save/load of CrossValidator
0291         cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
0292         cvModel = cv.fit(dataset)
0293         cvPath = temp_path + "/cv"
0294         cv.save(cvPath)
0295         loadedCV = CrossValidator.load(cvPath)
0296         self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
0297         self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
0298 
0299         originalParamMap = cv.getEstimatorParamMaps()
0300         loadedParamMap = loadedCV.getEstimatorParamMaps()
0301         for i, param in enumerate(loadedParamMap):
0302             for p in param:
0303                 if p.name == "classifier":
0304                     self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
0305                 else:
0306                     self.assertEqual(param[p], originalParamMap[i][p])
0307 
0308         # test save/load of CrossValidatorModel
0309         cvModelPath = temp_path + "/cvModel"
0310         cvModel.save(cvModelPath)
0311         loadedModel = CrossValidatorModel.load(cvModelPath)
0312         self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
0313 
0314     def test_save_load_pipeline_estimator(self):
0315         temp_path = tempfile.mkdtemp()
0316         training = self.spark.createDataFrame([
0317             (0, "a b c d e spark", 1.0),
0318             (1, "b d", 0.0),
0319             (2, "spark f g h", 1.0),
0320             (3, "hadoop mapreduce", 0.0),
0321             (4, "b spark who", 1.0),
0322             (5, "g d a y", 0.0),
0323             (6, "spark fly", 1.0),
0324             (7, "was mapreduce", 0.0),
0325         ], ["id", "text", "label"])
0326 
0327         # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
0328         tokenizer = Tokenizer(inputCol="text", outputCol="words")
0329         hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
0330 
0331         ova = OneVsRest(classifier=LogisticRegression())
0332         lr1 = LogisticRegression().setMaxIter(5)
0333         lr2 = LogisticRegression().setMaxIter(10)
0334 
0335         pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
0336 
0337         paramGrid = ParamGridBuilder() \
0338             .addGrid(hashingTF.numFeatures, [10, 100]) \
0339             .addGrid(ova.classifier, [lr1, lr2]) \
0340             .build()
0341 
0342         crossval = CrossValidator(estimator=pipeline,
0343                                   estimatorParamMaps=paramGrid,
0344                                   evaluator=MulticlassClassificationEvaluator(),
0345                                   numFolds=2)  # use 3+ folds in practice
0346 
0347         # Run cross-validation, and choose the best set of parameters.
0348         cvModel = crossval.fit(training)
0349 
0350         # test save/load of CrossValidatorModel
0351         cvModelPath = temp_path + "/cvModel"
0352         cvModel.save(cvModelPath)
0353         loadedModel = CrossValidatorModel.load(cvModelPath)
0354         self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
0355         self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages))
0356         for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
0357                                               cvModel.bestModel.stages):
0358             self.assertEqual(loadedStage.uid, originalStage.uid)
0359 
0360         # Test nested pipeline
0361         nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
0362         crossval2 = CrossValidator(estimator=nested_pipeline,
0363                                    estimatorParamMaps=paramGrid,
0364                                    evaluator=MulticlassClassificationEvaluator(),
0365                                    numFolds=2)  # use 3+ folds in practice
0366 
0367         # Run cross-validation, and choose the best set of parameters.
0368         cvModel2 = crossval2.fit(training)
0369         # test save/load of CrossValidatorModel
0370         cvModelPath2 = temp_path + "/cvModel2"
0371         cvModel2.save(cvModelPath2)
0372         loadedModel2 = CrossValidatorModel.load(cvModelPath2)
0373         self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
0374         loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
0375         original_nested_pipeline_model = cvModel2.bestModel.stages[1]
0376         self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
0377         self.assertEqual(len(loaded_nested_pipeline_model.stages),
0378                          len(original_nested_pipeline_model.stages))
0379         for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
0380                                               original_nested_pipeline_model.stages):
0381             self.assertEqual(loadedStage.uid, originalStage.uid)
0382 
0383 
0384 class TrainValidationSplitTests(SparkSessionTestCase):
0385 
0386     def test_fit_minimize_metric(self):
0387         dataset = self.spark.createDataFrame([
0388             (10, 10.0),
0389             (50, 50.0),
0390             (100, 100.0),
0391             (500, 500.0)] * 10,
0392             ["feature", "label"])
0393 
0394         iee = InducedErrorEstimator()
0395         evaluator = RegressionEvaluator(metricName="rmse")
0396 
0397         grid = ParamGridBuilder() \
0398             .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
0399             .build()
0400         tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0401         tvsModel = tvs.fit(dataset)
0402         bestModel = tvsModel.bestModel
0403         bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0404         validationMetrics = tvsModel.validationMetrics
0405 
0406         self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0407                          "Best model should have zero induced error")
0408         self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
0409         self.assertEqual(len(grid), len(validationMetrics),
0410                          "validationMetrics has the same size of grid parameter")
0411         self.assertEqual(0.0, min(validationMetrics))
0412 
0413     def test_fit_maximize_metric(self):
0414         dataset = self.spark.createDataFrame([
0415             (10, 10.0),
0416             (50, 50.0),
0417             (100, 100.0),
0418             (500, 500.0)] * 10,
0419             ["feature", "label"])
0420 
0421         iee = InducedErrorEstimator()
0422         evaluator = RegressionEvaluator(metricName="r2")
0423 
0424         grid = ParamGridBuilder() \
0425             .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
0426             .build()
0427         tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0428         tvsModel = tvs.fit(dataset)
0429         bestModel = tvsModel.bestModel
0430         bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0431         validationMetrics = tvsModel.validationMetrics
0432 
0433         self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0434                          "Best model should have zero induced error")
0435         self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
0436         self.assertEqual(len(grid), len(validationMetrics),
0437                          "validationMetrics has the same size of grid parameter")
0438         self.assertEqual(1.0, max(validationMetrics))
0439 
0440     def test_save_load_trained_model(self):
0441         # This tests saving and loading the trained model only.
0442         # Save/load for TrainValidationSplit will be added later: SPARK-13786
0443         temp_path = tempfile.mkdtemp()
0444         dataset = self.spark.createDataFrame(
0445             [(Vectors.dense([0.0]), 0.0),
0446              (Vectors.dense([0.4]), 1.0),
0447              (Vectors.dense([0.5]), 0.0),
0448              (Vectors.dense([0.6]), 1.0),
0449              (Vectors.dense([1.0]), 1.0)] * 10,
0450             ["features", "label"])
0451         lr = LogisticRegression()
0452         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0453         evaluator = BinaryClassificationEvaluator()
0454         tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0455         tvsModel = tvs.fit(dataset)
0456         lrModel = tvsModel.bestModel
0457 
0458         tvsModelPath = temp_path + "/tvsModel"
0459         lrModel.save(tvsModelPath)
0460         loadedLrModel = LogisticRegressionModel.load(tvsModelPath)
0461         self.assertEqual(loadedLrModel.uid, lrModel.uid)
0462         self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
0463 
0464     def test_save_load_simple_estimator(self):
0465         # This tests saving and loading the trained model only.
0466         # Save/load for TrainValidationSplit will be added later: SPARK-13786
0467         temp_path = tempfile.mkdtemp()
0468         dataset = self.spark.createDataFrame(
0469             [(Vectors.dense([0.0]), 0.0),
0470              (Vectors.dense([0.4]), 1.0),
0471              (Vectors.dense([0.5]), 0.0),
0472              (Vectors.dense([0.6]), 1.0),
0473              (Vectors.dense([1.0]), 1.0)] * 10,
0474             ["features", "label"])
0475         lr = LogisticRegression()
0476         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0477         evaluator = BinaryClassificationEvaluator()
0478         tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0479         tvsModel = tvs.fit(dataset)
0480 
0481         tvsPath = temp_path + "/tvs"
0482         tvs.save(tvsPath)
0483         loadedTvs = TrainValidationSplit.load(tvsPath)
0484         self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
0485         self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
0486         self.assertEqual(loadedTvs.getEstimatorParamMaps(), tvs.getEstimatorParamMaps())
0487 
0488         tvsModelPath = temp_path + "/tvsModel"
0489         tvsModel.save(tvsModelPath)
0490         loadedModel = TrainValidationSplitModel.load(tvsModelPath)
0491         self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
0492 
0493     def test_parallel_evaluation(self):
0494         dataset = self.spark.createDataFrame(
0495             [(Vectors.dense([0.0]), 0.0),
0496              (Vectors.dense([0.4]), 1.0),
0497              (Vectors.dense([0.5]), 0.0),
0498              (Vectors.dense([0.6]), 1.0),
0499              (Vectors.dense([1.0]), 1.0)] * 10,
0500             ["features", "label"])
0501         lr = LogisticRegression()
0502         grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
0503         evaluator = BinaryClassificationEvaluator()
0504         tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0505         tvs.setParallelism(1)
0506         tvsSerialModel = tvs.fit(dataset)
0507         tvs.setParallelism(2)
0508         tvsParallelModel = tvs.fit(dataset)
0509         self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
0510 
0511     def test_expose_sub_models(self):
0512         temp_path = tempfile.mkdtemp()
0513         dataset = self.spark.createDataFrame(
0514             [(Vectors.dense([0.0]), 0.0),
0515              (Vectors.dense([0.4]), 1.0),
0516              (Vectors.dense([0.5]), 0.0),
0517              (Vectors.dense([0.6]), 1.0),
0518              (Vectors.dense([1.0]), 1.0)] * 10,
0519             ["features", "label"])
0520         lr = LogisticRegression()
0521         grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0522         evaluator = BinaryClassificationEvaluator()
0523         tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
0524                                    collectSubModels=True)
0525         tvsModel = tvs.fit(dataset)
0526         self.assertEqual(len(tvsModel.subModels), len(grid))
0527 
0528         # Test the default value for option "persistSubModel" to be "true"
0529         testSubPath = temp_path + "/testTrainValidationSplitSubModels"
0530         savingPathWithSubModels = testSubPath + "cvModel3"
0531         tvsModel.save(savingPathWithSubModels)
0532         tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
0533         self.assertEqual(len(tvsModel3.subModels), len(grid))
0534         tvsModel4 = tvsModel3.copy()
0535         self.assertEqual(len(tvsModel4.subModels), len(grid))
0536 
0537         savingPathWithoutSubModels = testSubPath + "cvModel2"
0538         tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
0539         tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
0540         self.assertEqual(tvsModel2.subModels, None)
0541 
0542         for i in range(len(grid)):
0543             self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)
0544 
0545     def test_save_load_nested_estimator(self):
0546         # This tests saving and loading the trained model only.
0547         # Save/load for TrainValidationSplit will be added later: SPARK-13786
0548         temp_path = tempfile.mkdtemp()
0549         dataset = self.spark.createDataFrame(
0550             [(Vectors.dense([0.0]), 0.0),
0551              (Vectors.dense([0.4]), 1.0),
0552              (Vectors.dense([0.5]), 0.0),
0553              (Vectors.dense([0.6]), 1.0),
0554              (Vectors.dense([1.0]), 1.0)] * 10,
0555             ["features", "label"])
0556         ova = OneVsRest(classifier=LogisticRegression())
0557         lr1 = LogisticRegression().setMaxIter(100)
0558         lr2 = LogisticRegression().setMaxIter(150)
0559         grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
0560         evaluator = MulticlassClassificationEvaluator()
0561 
0562         tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
0563         tvsModel = tvs.fit(dataset)
0564         tvsPath = temp_path + "/tvs"
0565         tvs.save(tvsPath)
0566         loadedTvs = TrainValidationSplit.load(tvsPath)
0567         self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
0568         self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
0569 
0570         originalParamMap = tvs.getEstimatorParamMaps()
0571         loadedParamMap = loadedTvs.getEstimatorParamMaps()
0572         for i, param in enumerate(loadedParamMap):
0573             for p in param:
0574                 if p.name == "classifier":
0575                     self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
0576                 else:
0577                     self.assertEqual(param[p], originalParamMap[i][p])
0578 
0579         tvsModelPath = temp_path + "/tvsModel"
0580         tvsModel.save(tvsModelPath)
0581         loadedModel = TrainValidationSplitModel.load(tvsModelPath)
0582         self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
0583 
0584     def test_save_load_pipeline_estimator(self):
0585         temp_path = tempfile.mkdtemp()
0586         training = self.spark.createDataFrame([
0587             (0, "a b c d e spark", 1.0),
0588             (1, "b d", 0.0),
0589             (2, "spark f g h", 1.0),
0590             (3, "hadoop mapreduce", 0.0),
0591             (4, "b spark who", 1.0),
0592             (5, "g d a y", 0.0),
0593             (6, "spark fly", 1.0),
0594             (7, "was mapreduce", 0.0),
0595         ], ["id", "text", "label"])
0596 
0597         # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
0598         tokenizer = Tokenizer(inputCol="text", outputCol="words")
0599         hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
0600 
0601         ova = OneVsRest(classifier=LogisticRegression())
0602         lr1 = LogisticRegression().setMaxIter(5)
0603         lr2 = LogisticRegression().setMaxIter(10)
0604 
0605         pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
0606 
0607         paramGrid = ParamGridBuilder() \
0608             .addGrid(hashingTF.numFeatures, [10, 100]) \
0609             .addGrid(ova.classifier, [lr1, lr2]) \
0610             .build()
0611 
0612         tvs = TrainValidationSplit(estimator=pipeline,
0613                                    estimatorParamMaps=paramGrid,
0614                                    evaluator=MulticlassClassificationEvaluator())
0615 
0616         # Run train validation split, and choose the best set of parameters.
0617         tvsModel = tvs.fit(training)
0618 
0619         # test save/load of CrossValidatorModel
0620         tvsModelPath = temp_path + "/tvsModel"
0621         tvsModel.save(tvsModelPath)
0622         loadedModel = TrainValidationSplitModel.load(tvsModelPath)
0623         self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
0624         self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages))
0625         for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
0626                                               tvsModel.bestModel.stages):
0627             self.assertEqual(loadedStage.uid, originalStage.uid)
0628 
0629         # Test nested pipeline
0630         nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
0631         tvs2 = TrainValidationSplit(estimator=nested_pipeline,
0632                                     estimatorParamMaps=paramGrid,
0633                                     evaluator=MulticlassClassificationEvaluator())
0634 
0635         # Run train validation split, and choose the best set of parameters.
0636         tvsModel2 = tvs2.fit(training)
0637         # test save/load of CrossValidatorModel
0638         tvsModelPath2 = temp_path + "/tvsModel2"
0639         tvsModel2.save(tvsModelPath2)
0640         loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
0641         self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
0642         loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
0643         original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
0644         self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
0645         self.assertEqual(len(loaded_nested_pipeline_model.stages),
0646                          len(original_nested_pipeline_model.stages))
0647         for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
0648                                               original_nested_pipeline_model.stages):
0649             self.assertEqual(loadedStage.uid, originalStage.uid)
0650 
0651     def test_copy(self):
0652         dataset = self.spark.createDataFrame([
0653             (10, 10.0),
0654             (50, 50.0),
0655             (100, 100.0),
0656             (500, 500.0)] * 10,
0657             ["feature", "label"])
0658 
0659         iee = InducedErrorEstimator()
0660         evaluator = RegressionEvaluator(metricName="r2")
0661 
0662         grid = ParamGridBuilder() \
0663             .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
0664             .build()
0665         tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0666         tvsModel = tvs.fit(dataset)
0667         tvsCopied = tvs.copy()
0668         tvsModelCopied = tvsModel.copy()
0669 
0670         self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
0671                          "Copied TrainValidationSplit has the same uid of Estimator")
0672 
0673         self.assertEqual(tvsModel.bestModel.uid, tvsModelCopied.bestModel.uid)
0674         self.assertEqual(len(tvsModel.validationMetrics),
0675                          len(tvsModelCopied.validationMetrics),
0676                          "Copied validationMetrics has the same size of the original")
0677         for index in range(len(tvsModel.validationMetrics)):
0678             self.assertEqual(tvsModel.validationMetrics[index],
0679                              tvsModelCopied.validationMetrics[index])
0680 
0681 
0682 if __name__ == "__main__":
0683     from pyspark.ml.tests.test_tuning import *
0684 
0685     try:
0686         import xmlrunner
0687         testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0688     except ImportError:
0689         testRunner = None
0690     unittest.main(testRunner=testRunner, verbosity=2)