0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import tempfile
0019 import unittest
0020
0021 from pyspark.ml.feature import HashingTF, Tokenizer
0022 from pyspark.ml import Estimator, Pipeline, Model
0023 from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel, OneVsRest
0024 from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
0025 MulticlassClassificationEvaluator, RegressionEvaluator
0026 from pyspark.ml.linalg import Vectors
0027 from pyspark.ml.param import Param, Params
0028 from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder, \
0029 TrainValidationSplit, TrainValidationSplitModel
0030 from pyspark.sql.functions import rand
0031 from pyspark.testing.mlutils import SparkSessionTestCase
0032
0033
0034 class HasInducedError(Params):
0035
0036 def __init__(self):
0037 super(HasInducedError, self).__init__()
0038 self.inducedError = Param(self, "inducedError",
0039 "Uniformly-distributed error added to feature")
0040
0041 def getInducedError(self):
0042 return self.getOrDefault(self.inducedError)
0043
0044
0045 class InducedErrorModel(Model, HasInducedError):
0046
0047 def __init__(self):
0048 super(InducedErrorModel, self).__init__()
0049
0050 def _transform(self, dataset):
0051 return dataset.withColumn("prediction",
0052 dataset.feature + (rand(0) * self.getInducedError()))
0053
0054
0055 class InducedErrorEstimator(Estimator, HasInducedError):
0056
0057 def __init__(self, inducedError=1.0):
0058 super(InducedErrorEstimator, self).__init__()
0059 self._set(inducedError=inducedError)
0060
0061 def _fit(self, dataset):
0062 model = InducedErrorModel()
0063 self._copyValues(model)
0064 return model
0065
0066
0067 class ParamGridBuilderTests(SparkSessionTestCase):
0068
0069 def test_addGrid(self):
0070 with self.assertRaises(TypeError):
0071 grid = (ParamGridBuilder()
0072 .addGrid("must be an instance of Param", ["not", "string"])
0073 .build())
0074
0075
0076 class CrossValidatorTests(SparkSessionTestCase):
0077
0078 def test_copy(self):
0079 dataset = self.spark.createDataFrame([
0080 (10, 10.0),
0081 (50, 50.0),
0082 (100, 100.0),
0083 (500, 500.0)] * 10,
0084 ["feature", "label"])
0085
0086 iee = InducedErrorEstimator()
0087 evaluator = RegressionEvaluator(metricName="rmse")
0088
0089 grid = (ParamGridBuilder()
0090 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
0091 .build())
0092 cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0093 cvCopied = cv.copy()
0094 self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid)
0095
0096 cvModel = cv.fit(dataset)
0097 cvModelCopied = cvModel.copy()
0098 for index in range(len(cvModel.avgMetrics)):
0099 self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
0100 < 0.0001)
0101
0102 def test_fit_minimize_metric(self):
0103 dataset = self.spark.createDataFrame([
0104 (10, 10.0),
0105 (50, 50.0),
0106 (100, 100.0),
0107 (500, 500.0)] * 10,
0108 ["feature", "label"])
0109
0110 iee = InducedErrorEstimator()
0111 evaluator = RegressionEvaluator(metricName="rmse")
0112
0113 grid = (ParamGridBuilder()
0114 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
0115 .build())
0116 cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0117 cvModel = cv.fit(dataset)
0118 bestModel = cvModel.bestModel
0119 bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0120
0121 self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0122 "Best model should have zero induced error")
0123 self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
0124
0125 def test_fit_maximize_metric(self):
0126 dataset = self.spark.createDataFrame([
0127 (10, 10.0),
0128 (50, 50.0),
0129 (100, 100.0),
0130 (500, 500.0)] * 10,
0131 ["feature", "label"])
0132
0133 iee = InducedErrorEstimator()
0134 evaluator = RegressionEvaluator(metricName="r2")
0135
0136 grid = (ParamGridBuilder()
0137 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
0138 .build())
0139 cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0140 cvModel = cv.fit(dataset)
0141 bestModel = cvModel.bestModel
0142 bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0143
0144 self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0145 "Best model should have zero induced error")
0146 self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
0147
0148 def test_param_grid_type_coercion(self):
0149 lr = LogisticRegression(maxIter=10)
0150 paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.5, 1]).build()
0151 for param in paramGrid:
0152 for v in param.values():
0153 assert(type(v) == float)
0154
0155 def test_save_load_trained_model(self):
0156
0157
0158 temp_path = tempfile.mkdtemp()
0159 dataset = self.spark.createDataFrame(
0160 [(Vectors.dense([0.0]), 0.0),
0161 (Vectors.dense([0.4]), 1.0),
0162 (Vectors.dense([0.5]), 0.0),
0163 (Vectors.dense([0.6]), 1.0),
0164 (Vectors.dense([1.0]), 1.0)] * 10,
0165 ["features", "label"])
0166 lr = LogisticRegression()
0167 grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0168 evaluator = BinaryClassificationEvaluator()
0169 cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0170 cvModel = cv.fit(dataset)
0171 lrModel = cvModel.bestModel
0172
0173 cvModelPath = temp_path + "/cvModel"
0174 lrModel.save(cvModelPath)
0175 loadedLrModel = LogisticRegressionModel.load(cvModelPath)
0176 self.assertEqual(loadedLrModel.uid, lrModel.uid)
0177 self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
0178
0179 def test_save_load_simple_estimator(self):
0180 temp_path = tempfile.mkdtemp()
0181 dataset = self.spark.createDataFrame(
0182 [(Vectors.dense([0.0]), 0.0),
0183 (Vectors.dense([0.4]), 1.0),
0184 (Vectors.dense([0.5]), 0.0),
0185 (Vectors.dense([0.6]), 1.0),
0186 (Vectors.dense([1.0]), 1.0)] * 10,
0187 ["features", "label"])
0188
0189 lr = LogisticRegression()
0190 grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0191 evaluator = BinaryClassificationEvaluator()
0192
0193
0194 cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0195 cvModel = cv.fit(dataset)
0196 cvPath = temp_path + "/cv"
0197 cv.save(cvPath)
0198 loadedCV = CrossValidator.load(cvPath)
0199 self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
0200 self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
0201 self.assertEqual(loadedCV.getEstimatorParamMaps(), cv.getEstimatorParamMaps())
0202
0203
0204 cvModelPath = temp_path + "/cvModel"
0205 cvModel.save(cvModelPath)
0206 loadedModel = CrossValidatorModel.load(cvModelPath)
0207 self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
0208
0209 def test_parallel_evaluation(self):
0210 dataset = self.spark.createDataFrame(
0211 [(Vectors.dense([0.0]), 0.0),
0212 (Vectors.dense([0.4]), 1.0),
0213 (Vectors.dense([0.5]), 0.0),
0214 (Vectors.dense([0.6]), 1.0),
0215 (Vectors.dense([1.0]), 1.0)] * 10,
0216 ["features", "label"])
0217
0218 lr = LogisticRegression()
0219 grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
0220 evaluator = BinaryClassificationEvaluator()
0221
0222
0223 cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0224 cv.setParallelism(1)
0225 cvSerialModel = cv.fit(dataset)
0226 cv.setParallelism(2)
0227 cvParallelModel = cv.fit(dataset)
0228 self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)
0229
0230 def test_expose_sub_models(self):
0231 temp_path = tempfile.mkdtemp()
0232 dataset = self.spark.createDataFrame(
0233 [(Vectors.dense([0.0]), 0.0),
0234 (Vectors.dense([0.4]), 1.0),
0235 (Vectors.dense([0.5]), 0.0),
0236 (Vectors.dense([0.6]), 1.0),
0237 (Vectors.dense([1.0]), 1.0)] * 10,
0238 ["features", "label"])
0239
0240 lr = LogisticRegression()
0241 grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0242 evaluator = BinaryClassificationEvaluator()
0243
0244 numFolds = 3
0245 cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
0246 numFolds=numFolds, collectSubModels=True)
0247
0248 def checkSubModels(subModels):
0249 self.assertEqual(len(subModels), numFolds)
0250 for i in range(numFolds):
0251 self.assertEqual(len(subModels[i]), len(grid))
0252
0253 cvModel = cv.fit(dataset)
0254 checkSubModels(cvModel.subModels)
0255
0256
0257 testSubPath = temp_path + "/testCrossValidatorSubModels"
0258 savingPathWithSubModels = testSubPath + "cvModel3"
0259 cvModel.save(savingPathWithSubModels)
0260 cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
0261 checkSubModels(cvModel3.subModels)
0262 cvModel4 = cvModel3.copy()
0263 checkSubModels(cvModel4.subModels)
0264
0265 savingPathWithoutSubModels = testSubPath + "cvModel2"
0266 cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
0267 cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
0268 self.assertEqual(cvModel2.subModels, None)
0269
0270 for i in range(numFolds):
0271 for j in range(len(grid)):
0272 self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)
0273
0274 def test_save_load_nested_estimator(self):
0275 temp_path = tempfile.mkdtemp()
0276 dataset = self.spark.createDataFrame(
0277 [(Vectors.dense([0.0]), 0.0),
0278 (Vectors.dense([0.4]), 1.0),
0279 (Vectors.dense([0.5]), 0.0),
0280 (Vectors.dense([0.6]), 1.0),
0281 (Vectors.dense([1.0]), 1.0)] * 10,
0282 ["features", "label"])
0283
0284 ova = OneVsRest(classifier=LogisticRegression())
0285 lr1 = LogisticRegression().setMaxIter(100)
0286 lr2 = LogisticRegression().setMaxIter(150)
0287 grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
0288 evaluator = MulticlassClassificationEvaluator()
0289
0290
0291 cv = CrossValidator(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
0292 cvModel = cv.fit(dataset)
0293 cvPath = temp_path + "/cv"
0294 cv.save(cvPath)
0295 loadedCV = CrossValidator.load(cvPath)
0296 self.assertEqual(loadedCV.getEstimator().uid, cv.getEstimator().uid)
0297 self.assertEqual(loadedCV.getEvaluator().uid, cv.getEvaluator().uid)
0298
0299 originalParamMap = cv.getEstimatorParamMaps()
0300 loadedParamMap = loadedCV.getEstimatorParamMaps()
0301 for i, param in enumerate(loadedParamMap):
0302 for p in param:
0303 if p.name == "classifier":
0304 self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
0305 else:
0306 self.assertEqual(param[p], originalParamMap[i][p])
0307
0308
0309 cvModelPath = temp_path + "/cvModel"
0310 cvModel.save(cvModelPath)
0311 loadedModel = CrossValidatorModel.load(cvModelPath)
0312 self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
0313
0314 def test_save_load_pipeline_estimator(self):
0315 temp_path = tempfile.mkdtemp()
0316 training = self.spark.createDataFrame([
0317 (0, "a b c d e spark", 1.0),
0318 (1, "b d", 0.0),
0319 (2, "spark f g h", 1.0),
0320 (3, "hadoop mapreduce", 0.0),
0321 (4, "b spark who", 1.0),
0322 (5, "g d a y", 0.0),
0323 (6, "spark fly", 1.0),
0324 (7, "was mapreduce", 0.0),
0325 ], ["id", "text", "label"])
0326
0327
0328 tokenizer = Tokenizer(inputCol="text", outputCol="words")
0329 hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
0330
0331 ova = OneVsRest(classifier=LogisticRegression())
0332 lr1 = LogisticRegression().setMaxIter(5)
0333 lr2 = LogisticRegression().setMaxIter(10)
0334
0335 pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
0336
0337 paramGrid = ParamGridBuilder() \
0338 .addGrid(hashingTF.numFeatures, [10, 100]) \
0339 .addGrid(ova.classifier, [lr1, lr2]) \
0340 .build()
0341
0342 crossval = CrossValidator(estimator=pipeline,
0343 estimatorParamMaps=paramGrid,
0344 evaluator=MulticlassClassificationEvaluator(),
0345 numFolds=2)
0346
0347
0348 cvModel = crossval.fit(training)
0349
0350
0351 cvModelPath = temp_path + "/cvModel"
0352 cvModel.save(cvModelPath)
0353 loadedModel = CrossValidatorModel.load(cvModelPath)
0354 self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)
0355 self.assertEqual(len(loadedModel.bestModel.stages), len(cvModel.bestModel.stages))
0356 for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
0357 cvModel.bestModel.stages):
0358 self.assertEqual(loadedStage.uid, originalStage.uid)
0359
0360
0361 nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
0362 crossval2 = CrossValidator(estimator=nested_pipeline,
0363 estimatorParamMaps=paramGrid,
0364 evaluator=MulticlassClassificationEvaluator(),
0365 numFolds=2)
0366
0367
0368 cvModel2 = crossval2.fit(training)
0369
0370 cvModelPath2 = temp_path + "/cvModel2"
0371 cvModel2.save(cvModelPath2)
0372 loadedModel2 = CrossValidatorModel.load(cvModelPath2)
0373 self.assertEqual(loadedModel2.bestModel.uid, cvModel2.bestModel.uid)
0374 loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
0375 original_nested_pipeline_model = cvModel2.bestModel.stages[1]
0376 self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
0377 self.assertEqual(len(loaded_nested_pipeline_model.stages),
0378 len(original_nested_pipeline_model.stages))
0379 for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
0380 original_nested_pipeline_model.stages):
0381 self.assertEqual(loadedStage.uid, originalStage.uid)
0382
0383
0384 class TrainValidationSplitTests(SparkSessionTestCase):
0385
0386 def test_fit_minimize_metric(self):
0387 dataset = self.spark.createDataFrame([
0388 (10, 10.0),
0389 (50, 50.0),
0390 (100, 100.0),
0391 (500, 500.0)] * 10,
0392 ["feature", "label"])
0393
0394 iee = InducedErrorEstimator()
0395 evaluator = RegressionEvaluator(metricName="rmse")
0396
0397 grid = ParamGridBuilder() \
0398 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
0399 .build()
0400 tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0401 tvsModel = tvs.fit(dataset)
0402 bestModel = tvsModel.bestModel
0403 bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0404 validationMetrics = tvsModel.validationMetrics
0405
0406 self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0407 "Best model should have zero induced error")
0408 self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")
0409 self.assertEqual(len(grid), len(validationMetrics),
0410 "validationMetrics has the same size of grid parameter")
0411 self.assertEqual(0.0, min(validationMetrics))
0412
0413 def test_fit_maximize_metric(self):
0414 dataset = self.spark.createDataFrame([
0415 (10, 10.0),
0416 (50, 50.0),
0417 (100, 100.0),
0418 (500, 500.0)] * 10,
0419 ["feature", "label"])
0420
0421 iee = InducedErrorEstimator()
0422 evaluator = RegressionEvaluator(metricName="r2")
0423
0424 grid = ParamGridBuilder() \
0425 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
0426 .build()
0427 tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0428 tvsModel = tvs.fit(dataset)
0429 bestModel = tvsModel.bestModel
0430 bestModelMetric = evaluator.evaluate(bestModel.transform(dataset))
0431 validationMetrics = tvsModel.validationMetrics
0432
0433 self.assertEqual(0.0, bestModel.getOrDefault('inducedError'),
0434 "Best model should have zero induced error")
0435 self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1")
0436 self.assertEqual(len(grid), len(validationMetrics),
0437 "validationMetrics has the same size of grid parameter")
0438 self.assertEqual(1.0, max(validationMetrics))
0439
0440 def test_save_load_trained_model(self):
0441
0442
0443 temp_path = tempfile.mkdtemp()
0444 dataset = self.spark.createDataFrame(
0445 [(Vectors.dense([0.0]), 0.0),
0446 (Vectors.dense([0.4]), 1.0),
0447 (Vectors.dense([0.5]), 0.0),
0448 (Vectors.dense([0.6]), 1.0),
0449 (Vectors.dense([1.0]), 1.0)] * 10,
0450 ["features", "label"])
0451 lr = LogisticRegression()
0452 grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0453 evaluator = BinaryClassificationEvaluator()
0454 tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0455 tvsModel = tvs.fit(dataset)
0456 lrModel = tvsModel.bestModel
0457
0458 tvsModelPath = temp_path + "/tvsModel"
0459 lrModel.save(tvsModelPath)
0460 loadedLrModel = LogisticRegressionModel.load(tvsModelPath)
0461 self.assertEqual(loadedLrModel.uid, lrModel.uid)
0462 self.assertEqual(loadedLrModel.intercept, lrModel.intercept)
0463
0464 def test_save_load_simple_estimator(self):
0465
0466
0467 temp_path = tempfile.mkdtemp()
0468 dataset = self.spark.createDataFrame(
0469 [(Vectors.dense([0.0]), 0.0),
0470 (Vectors.dense([0.4]), 1.0),
0471 (Vectors.dense([0.5]), 0.0),
0472 (Vectors.dense([0.6]), 1.0),
0473 (Vectors.dense([1.0]), 1.0)] * 10,
0474 ["features", "label"])
0475 lr = LogisticRegression()
0476 grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0477 evaluator = BinaryClassificationEvaluator()
0478 tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0479 tvsModel = tvs.fit(dataset)
0480
0481 tvsPath = temp_path + "/tvs"
0482 tvs.save(tvsPath)
0483 loadedTvs = TrainValidationSplit.load(tvsPath)
0484 self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
0485 self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
0486 self.assertEqual(loadedTvs.getEstimatorParamMaps(), tvs.getEstimatorParamMaps())
0487
0488 tvsModelPath = temp_path + "/tvsModel"
0489 tvsModel.save(tvsModelPath)
0490 loadedModel = TrainValidationSplitModel.load(tvsModelPath)
0491 self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
0492
0493 def test_parallel_evaluation(self):
0494 dataset = self.spark.createDataFrame(
0495 [(Vectors.dense([0.0]), 0.0),
0496 (Vectors.dense([0.4]), 1.0),
0497 (Vectors.dense([0.5]), 0.0),
0498 (Vectors.dense([0.6]), 1.0),
0499 (Vectors.dense([1.0]), 1.0)] * 10,
0500 ["features", "label"])
0501 lr = LogisticRegression()
0502 grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
0503 evaluator = BinaryClassificationEvaluator()
0504 tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
0505 tvs.setParallelism(1)
0506 tvsSerialModel = tvs.fit(dataset)
0507 tvs.setParallelism(2)
0508 tvsParallelModel = tvs.fit(dataset)
0509 self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)
0510
0511 def test_expose_sub_models(self):
0512 temp_path = tempfile.mkdtemp()
0513 dataset = self.spark.createDataFrame(
0514 [(Vectors.dense([0.0]), 0.0),
0515 (Vectors.dense([0.4]), 1.0),
0516 (Vectors.dense([0.5]), 0.0),
0517 (Vectors.dense([0.6]), 1.0),
0518 (Vectors.dense([1.0]), 1.0)] * 10,
0519 ["features", "label"])
0520 lr = LogisticRegression()
0521 grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
0522 evaluator = BinaryClassificationEvaluator()
0523 tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
0524 collectSubModels=True)
0525 tvsModel = tvs.fit(dataset)
0526 self.assertEqual(len(tvsModel.subModels), len(grid))
0527
0528
0529 testSubPath = temp_path + "/testTrainValidationSplitSubModels"
0530 savingPathWithSubModels = testSubPath + "cvModel3"
0531 tvsModel.save(savingPathWithSubModels)
0532 tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
0533 self.assertEqual(len(tvsModel3.subModels), len(grid))
0534 tvsModel4 = tvsModel3.copy()
0535 self.assertEqual(len(tvsModel4.subModels), len(grid))
0536
0537 savingPathWithoutSubModels = testSubPath + "cvModel2"
0538 tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
0539 tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
0540 self.assertEqual(tvsModel2.subModels, None)
0541
0542 for i in range(len(grid)):
0543 self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)
0544
0545 def test_save_load_nested_estimator(self):
0546
0547
0548 temp_path = tempfile.mkdtemp()
0549 dataset = self.spark.createDataFrame(
0550 [(Vectors.dense([0.0]), 0.0),
0551 (Vectors.dense([0.4]), 1.0),
0552 (Vectors.dense([0.5]), 0.0),
0553 (Vectors.dense([0.6]), 1.0),
0554 (Vectors.dense([1.0]), 1.0)] * 10,
0555 ["features", "label"])
0556 ova = OneVsRest(classifier=LogisticRegression())
0557 lr1 = LogisticRegression().setMaxIter(100)
0558 lr2 = LogisticRegression().setMaxIter(150)
0559 grid = ParamGridBuilder().addGrid(ova.classifier, [lr1, lr2]).build()
0560 evaluator = MulticlassClassificationEvaluator()
0561
0562 tvs = TrainValidationSplit(estimator=ova, estimatorParamMaps=grid, evaluator=evaluator)
0563 tvsModel = tvs.fit(dataset)
0564 tvsPath = temp_path + "/tvs"
0565 tvs.save(tvsPath)
0566 loadedTvs = TrainValidationSplit.load(tvsPath)
0567 self.assertEqual(loadedTvs.getEstimator().uid, tvs.getEstimator().uid)
0568 self.assertEqual(loadedTvs.getEvaluator().uid, tvs.getEvaluator().uid)
0569
0570 originalParamMap = tvs.getEstimatorParamMaps()
0571 loadedParamMap = loadedTvs.getEstimatorParamMaps()
0572 for i, param in enumerate(loadedParamMap):
0573 for p in param:
0574 if p.name == "classifier":
0575 self.assertEqual(param[p].uid, originalParamMap[i][p].uid)
0576 else:
0577 self.assertEqual(param[p], originalParamMap[i][p])
0578
0579 tvsModelPath = temp_path + "/tvsModel"
0580 tvsModel.save(tvsModelPath)
0581 loadedModel = TrainValidationSplitModel.load(tvsModelPath)
0582 self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
0583
0584 def test_save_load_pipeline_estimator(self):
0585 temp_path = tempfile.mkdtemp()
0586 training = self.spark.createDataFrame([
0587 (0, "a b c d e spark", 1.0),
0588 (1, "b d", 0.0),
0589 (2, "spark f g h", 1.0),
0590 (3, "hadoop mapreduce", 0.0),
0591 (4, "b spark who", 1.0),
0592 (5, "g d a y", 0.0),
0593 (6, "spark fly", 1.0),
0594 (7, "was mapreduce", 0.0),
0595 ], ["id", "text", "label"])
0596
0597
0598 tokenizer = Tokenizer(inputCol="text", outputCol="words")
0599 hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
0600
0601 ova = OneVsRest(classifier=LogisticRegression())
0602 lr1 = LogisticRegression().setMaxIter(5)
0603 lr2 = LogisticRegression().setMaxIter(10)
0604
0605 pipeline = Pipeline(stages=[tokenizer, hashingTF, ova])
0606
0607 paramGrid = ParamGridBuilder() \
0608 .addGrid(hashingTF.numFeatures, [10, 100]) \
0609 .addGrid(ova.classifier, [lr1, lr2]) \
0610 .build()
0611
0612 tvs = TrainValidationSplit(estimator=pipeline,
0613 estimatorParamMaps=paramGrid,
0614 evaluator=MulticlassClassificationEvaluator())
0615
0616
0617 tvsModel = tvs.fit(training)
0618
0619
0620 tvsModelPath = temp_path + "/tvsModel"
0621 tvsModel.save(tvsModelPath)
0622 loadedModel = TrainValidationSplitModel.load(tvsModelPath)
0623 self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)
0624 self.assertEqual(len(loadedModel.bestModel.stages), len(tvsModel.bestModel.stages))
0625 for loadedStage, originalStage in zip(loadedModel.bestModel.stages,
0626 tvsModel.bestModel.stages):
0627 self.assertEqual(loadedStage.uid, originalStage.uid)
0628
0629
0630 nested_pipeline = Pipeline(stages=[tokenizer, Pipeline(stages=[hashingTF, ova])])
0631 tvs2 = TrainValidationSplit(estimator=nested_pipeline,
0632 estimatorParamMaps=paramGrid,
0633 evaluator=MulticlassClassificationEvaluator())
0634
0635
0636 tvsModel2 = tvs2.fit(training)
0637
0638 tvsModelPath2 = temp_path + "/tvsModel2"
0639 tvsModel2.save(tvsModelPath2)
0640 loadedModel2 = TrainValidationSplitModel.load(tvsModelPath2)
0641 self.assertEqual(loadedModel2.bestModel.uid, tvsModel2.bestModel.uid)
0642 loaded_nested_pipeline_model = loadedModel2.bestModel.stages[1]
0643 original_nested_pipeline_model = tvsModel2.bestModel.stages[1]
0644 self.assertEqual(loaded_nested_pipeline_model.uid, original_nested_pipeline_model.uid)
0645 self.assertEqual(len(loaded_nested_pipeline_model.stages),
0646 len(original_nested_pipeline_model.stages))
0647 for loadedStage, originalStage in zip(loaded_nested_pipeline_model.stages,
0648 original_nested_pipeline_model.stages):
0649 self.assertEqual(loadedStage.uid, originalStage.uid)
0650
0651 def test_copy(self):
0652 dataset = self.spark.createDataFrame([
0653 (10, 10.0),
0654 (50, 50.0),
0655 (100, 100.0),
0656 (500, 500.0)] * 10,
0657 ["feature", "label"])
0658
0659 iee = InducedErrorEstimator()
0660 evaluator = RegressionEvaluator(metricName="r2")
0661
0662 grid = ParamGridBuilder() \
0663 .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
0664 .build()
0665 tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
0666 tvsModel = tvs.fit(dataset)
0667 tvsCopied = tvs.copy()
0668 tvsModelCopied = tvsModel.copy()
0669
0670 self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
0671 "Copied TrainValidationSplit has the same uid of Estimator")
0672
0673 self.assertEqual(tvsModel.bestModel.uid, tvsModelCopied.bestModel.uid)
0674 self.assertEqual(len(tvsModel.validationMetrics),
0675 len(tvsModelCopied.validationMetrics),
0676 "Copied validationMetrics has the same size of the original")
0677 for index in range(len(tvsModel.validationMetrics)):
0678 self.assertEqual(tvsModel.validationMetrics[index],
0679 tvsModelCopied.validationMetrics[index])
0680
0681
0682 if __name__ == "__main__":
0683 from pyspark.ml.tests.test_tuning import *
0684
0685 try:
0686 import xmlrunner
0687 testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
0688 except ImportError:
0689 testRunner = None
0690 unittest.main(testRunner=testRunner, verbosity=2)