0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019 import warnings
0020
0021 import numpy as np
0022
0023 from pyspark import RDD, since
0024 from pyspark.streaming.dstream import DStream
0025 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
0026 from pyspark.mllib.linalg import SparseVector, _convert_to_vector
0027 from pyspark.mllib.util import Saveable, Loader
0028
0029 __all__ = ['LabeledPoint', 'LinearModel',
0030 'LinearRegressionModel', 'LinearRegressionWithSGD',
0031 'RidgeRegressionModel', 'RidgeRegressionWithSGD',
0032 'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
0033 'IsotonicRegression', 'StreamingLinearAlgorithm',
0034 'StreamingLinearRegressionWithSGD']
0035
0036
0037 class LabeledPoint(object):
0038
0039 """
0040 Class that represents the features and labels of a data point.
0041
0042 :param label:
0043 Label for this data point.
0044 :param features:
0045 Vector of features for this point (NumPy array, list,
0046 pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix).
0047
0048 .. note:: 'label' and 'features' are accessible as class attributes.
0049
0050 .. versionadded:: 1.0.0
0051 """
0052
0053 def __init__(self, label, features):
0054 self.label = float(label)
0055 self.features = _convert_to_vector(features)
0056
0057 def __reduce__(self):
0058 return (LabeledPoint, (self.label, self.features))
0059
0060 def __str__(self):
0061 return "(" + ",".join((str(self.label), str(self.features))) + ")"
0062
0063 def __repr__(self):
0064 return "LabeledPoint(%s, %s)" % (self.label, self.features)
0065
0066
0067 class LinearModel(object):
0068
0069 """
0070 A linear model that has a vector of coefficients and an intercept.
0071
0072 :param weights:
0073 Weights computed for every feature.
0074 :param intercept:
0075 Intercept computed for this model.
0076
0077 .. versionadded:: 0.9.0
0078 """
0079
0080 def __init__(self, weights, intercept):
0081 self._coeff = _convert_to_vector(weights)
0082 self._intercept = float(intercept)
0083
0084 @property
0085 @since("1.0.0")
0086 def weights(self):
0087 """Weights computed for every feature."""
0088 return self._coeff
0089
0090 @property
0091 @since("1.0.0")
0092 def intercept(self):
0093 """Intercept computed for this model."""
0094 return self._intercept
0095
0096 def __repr__(self):
0097 return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)
0098
0099
0100 @inherit_doc
0101 class LinearRegressionModelBase(LinearModel):
0102
0103 """A linear regression model.
0104
0105 >>> lrmb = LinearRegressionModelBase(np.array([1.0, 2.0]), 0.1)
0106 >>> abs(lrmb.predict(np.array([-1.03, 7.777])) - 14.624) < 1e-6
0107 True
0108 >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
0109 True
0110
0111 .. versionadded:: 0.9.0
0112 """
0113
0114 @since("0.9.0")
0115 def predict(self, x):
0116 """
0117 Predict the value of the dependent variable given a vector or
0118 an RDD of vectors containing values for the independent variables.
0119 """
0120 if isinstance(x, RDD):
0121 return x.map(self.predict)
0122 x = _convert_to_vector(x)
0123 return self.weights.dot(x) + self.intercept
0124
0125
0126 @inherit_doc
0127 class LinearRegressionModel(LinearRegressionModelBase):
0128
0129 """A linear regression model derived from a least-squares fit.
0130
0131 >>> from pyspark.mllib.regression import LabeledPoint
0132 >>> data = [
0133 ... LabeledPoint(0.0, [0.0]),
0134 ... LabeledPoint(1.0, [1.0]),
0135 ... LabeledPoint(3.0, [2.0]),
0136 ... LabeledPoint(2.0, [3.0])
0137 ... ]
0138 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0139 ... initialWeights=np.array([1.0]))
0140 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0141 True
0142 >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
0143 True
0144 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0145 True
0146 >>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
0147 True
0148 >>> import os, tempfile
0149 >>> path = tempfile.mkdtemp()
0150 >>> lrm.save(sc, path)
0151 >>> sameModel = LinearRegressionModel.load(sc, path)
0152 >>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
0153 True
0154 >>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
0155 True
0156 >>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0157 True
0158 >>> from shutil import rmtree
0159 >>> try:
0160 ... rmtree(path)
0161 ... except:
0162 ... pass
0163 >>> data = [
0164 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
0165 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
0166 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
0167 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
0168 ... ]
0169 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0170 ... initialWeights=np.array([1.0]))
0171 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0172 True
0173 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0174 True
0175 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
0176 ... miniBatchFraction=1.0, initialWeights=np.array([1.0]), regParam=0.1, regType="l2",
0177 ... intercept=True, validateData=True)
0178 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0179 True
0180 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0181 True
0182
0183 .. versionadded:: 0.9.0
0184 """
0185 @since("1.4.0")
0186 def save(self, sc, path):
0187 """Save a LinearRegressionModel."""
0188 java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel(
0189 _py2java(sc, self._coeff), self.intercept)
0190 java_model.save(sc._jsc.sc(), path)
0191
0192 @classmethod
0193 @since("1.4.0")
0194 def load(cls, sc, path):
0195 """Load a LinearRegressionModel."""
0196 java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel.load(
0197 sc._jsc.sc(), path)
0198 weights = _java2py(sc, java_model.weights())
0199 intercept = java_model.intercept()
0200 model = LinearRegressionModel(weights, intercept)
0201 return model
0202
0203
0204
0205
0206
0207 def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
0208 from pyspark.mllib.classification import LogisticRegressionModel
0209 first = data.first()
0210 if not isinstance(first, LabeledPoint):
0211 raise TypeError("data should be an RDD of LabeledPoint, but got %s" % type(first))
0212 if initial_weights is None:
0213 initial_weights = [0.0] * len(data.first().features)
0214 if (modelClass == LogisticRegressionModel):
0215 weights, intercept, numFeatures, numClasses = train_func(
0216 data, _convert_to_vector(initial_weights))
0217 return modelClass(weights, intercept, numFeatures, numClasses)
0218 else:
0219 weights, intercept = train_func(data, _convert_to_vector(initial_weights))
0220 return modelClass(weights, intercept)
0221
0222
0223 class LinearRegressionWithSGD(object):
0224 """
0225 .. versionadded:: 0.9.0
0226 .. note:: Deprecated in 2.0.0. Use ml.regression.LinearRegression.
0227 """
0228 @classmethod
0229 @since("0.9.0")
0230 def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
0231 initialWeights=None, regParam=0.0, regType=None, intercept=False,
0232 validateData=True, convergenceTol=0.001):
0233 """
0234 Train a linear regression model using Stochastic Gradient
0235 Descent (SGD). This solves the least squares regression
0236 formulation
0237
0238 f(weights) = 1/(2n) ||A weights - y||^2
0239
0240 which is the mean squared error. Here the data matrix has n rows,
0241 and the input RDD holds the set of rows of A, each with its
0242 corresponding right hand side label y.
0243 See also the documentation for the precise formulation.
0244
0245 :param data:
0246 The training data, an RDD of LabeledPoint.
0247 :param iterations:
0248 The number of iterations.
0249 (default: 100)
0250 :param step:
0251 The step parameter used in SGD.
0252 (default: 1.0)
0253 :param miniBatchFraction:
0254 Fraction of data to be used for each SGD iteration.
0255 (default: 1.0)
0256 :param initialWeights:
0257 The initial weights.
0258 (default: None)
0259 :param regParam:
0260 The regularizer parameter.
0261 (default: 0.0)
0262 :param regType:
0263 The type of regularizer used for training our model.
0264 Supported values:
0265
0266 - "l1" for using L1 regularization
0267 - "l2" for using L2 regularization
0268 - None for no regularization (default)
0269 :param intercept:
0270 Boolean parameter which indicates the use or not of the
0271 augmented representation for training data (i.e., whether bias
0272 features are activated or not).
0273 (default: False)
0274 :param validateData:
0275 Boolean parameter which indicates if the algorithm should
0276 validate data before training.
0277 (default: True)
0278 :param convergenceTol:
0279 A condition which decides iteration termination.
0280 (default: 0.001)
0281 """
0282 warnings.warn(
0283 "Deprecated in 2.0.0. Use ml.regression.LinearRegression.", DeprecationWarning)
0284
0285 def train(rdd, i):
0286 return callMLlibFunc("trainLinearRegressionModelWithSGD", rdd, int(iterations),
0287 float(step), float(miniBatchFraction), i, float(regParam),
0288 regType, bool(intercept), bool(validateData),
0289 float(convergenceTol))
0290
0291 return _regression_train_wrapper(train, LinearRegressionModel, data, initialWeights)
0292
0293
0294 @inherit_doc
0295 class LassoModel(LinearRegressionModelBase):
0296
0297 """A linear regression model derived from a least-squares fit with
0298 an l_1 penalty term.
0299
0300 >>> from pyspark.mllib.regression import LabeledPoint
0301 >>> data = [
0302 ... LabeledPoint(0.0, [0.0]),
0303 ... LabeledPoint(1.0, [1.0]),
0304 ... LabeledPoint(3.0, [2.0]),
0305 ... LabeledPoint(2.0, [3.0])
0306 ... ]
0307 >>> lrm = LassoWithSGD.train(
0308 ... sc.parallelize(data), iterations=10, initialWeights=np.array([1.0]))
0309 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0310 True
0311 >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
0312 True
0313 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0314 True
0315 >>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
0316 True
0317 >>> import os, tempfile
0318 >>> path = tempfile.mkdtemp()
0319 >>> lrm.save(sc, path)
0320 >>> sameModel = LassoModel.load(sc, path)
0321 >>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
0322 True
0323 >>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
0324 True
0325 >>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0326 True
0327 >>> from shutil import rmtree
0328 >>> try:
0329 ... rmtree(path)
0330 ... except:
0331 ... pass
0332 >>> data = [
0333 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
0334 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
0335 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
0336 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
0337 ... ]
0338 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0339 ... initialWeights=np.array([1.0]))
0340 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0341 True
0342 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0343 True
0344 >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
0345 ... regParam=0.01, miniBatchFraction=1.0, initialWeights=np.array([1.0]), intercept=True,
0346 ... validateData=True)
0347 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0348 True
0349 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0350 True
0351
0352 .. versionadded:: 0.9.0
0353 """
0354 @since("1.4.0")
0355 def save(self, sc, path):
0356 """Save a LassoModel."""
0357 java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel(
0358 _py2java(sc, self._coeff), self.intercept)
0359 java_model.save(sc._jsc.sc(), path)
0360
0361 @classmethod
0362 @since("1.4.0")
0363 def load(cls, sc, path):
0364 """Load a LassoModel."""
0365 java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel.load(
0366 sc._jsc.sc(), path)
0367 weights = _java2py(sc, java_model.weights())
0368 intercept = java_model.intercept()
0369 model = LassoModel(weights, intercept)
0370 return model
0371
0372
0373 class LassoWithSGD(object):
0374 """
0375 .. versionadded:: 0.9.0
0376 .. note:: Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 1.0.
0377 Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression.
0378 """
0379 @classmethod
0380 @since("0.9.0")
0381 def train(cls, data, iterations=100, step=1.0, regParam=0.01,
0382 miniBatchFraction=1.0, initialWeights=None, intercept=False,
0383 validateData=True, convergenceTol=0.001):
0384 """
0385 Train a regression model with L1-regularization using Stochastic
0386 Gradient Descent. This solves the l1-regularized least squares
0387 regression formulation
0388
0389 f(weights) = 1/(2n) ||A weights - y||^2 + regParam ||weights||_1
0390
0391 Here the data matrix has n rows, and the input RDD holds the set
0392 of rows of A, each with its corresponding right hand side label y.
0393 See also the documentation for the precise formulation.
0394
0395 :param data:
0396 The training data, an RDD of LabeledPoint.
0397 :param iterations:
0398 The number of iterations.
0399 (default: 100)
0400 :param step:
0401 The step parameter used in SGD.
0402 (default: 1.0)
0403 :param regParam:
0404 The regularizer parameter.
0405 (default: 0.01)
0406 :param miniBatchFraction:
0407 Fraction of data to be used for each SGD iteration.
0408 (default: 1.0)
0409 :param initialWeights:
0410 The initial weights.
0411 (default: None)
0412 :param intercept:
0413 Boolean parameter which indicates the use or not of the
0414 augmented representation for training data (i.e. whether bias
0415 features are activated or not).
0416 (default: False)
0417 :param validateData:
0418 Boolean parameter which indicates if the algorithm should
0419 validate data before training.
0420 (default: True)
0421 :param convergenceTol:
0422 A condition which decides iteration termination.
0423 (default: 0.001)
0424 """
0425 warnings.warn(
0426 "Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 1.0. "
0427 "Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression.",
0428 DeprecationWarning)
0429
0430 def train(rdd, i):
0431 return callMLlibFunc("trainLassoModelWithSGD", rdd, int(iterations), float(step),
0432 float(regParam), float(miniBatchFraction), i, bool(intercept),
0433 bool(validateData), float(convergenceTol))
0434
0435 return _regression_train_wrapper(train, LassoModel, data, initialWeights)
0436
0437
0438 @inherit_doc
0439 class RidgeRegressionModel(LinearRegressionModelBase):
0440
0441 """A linear regression model derived from a least-squares fit with
0442 an l_2 penalty term.
0443
0444 >>> from pyspark.mllib.regression import LabeledPoint
0445 >>> data = [
0446 ... LabeledPoint(0.0, [0.0]),
0447 ... LabeledPoint(1.0, [1.0]),
0448 ... LabeledPoint(3.0, [2.0]),
0449 ... LabeledPoint(2.0, [3.0])
0450 ... ]
0451 >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0452 ... initialWeights=np.array([1.0]))
0453 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0454 True
0455 >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
0456 True
0457 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0458 True
0459 >>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
0460 True
0461 >>> import os, tempfile
0462 >>> path = tempfile.mkdtemp()
0463 >>> lrm.save(sc, path)
0464 >>> sameModel = RidgeRegressionModel.load(sc, path)
0465 >>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
0466 True
0467 >>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
0468 True
0469 >>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0470 True
0471 >>> from shutil import rmtree
0472 >>> try:
0473 ... rmtree(path)
0474 ... except:
0475 ... pass
0476 >>> data = [
0477 ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
0478 ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
0479 ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
0480 ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
0481 ... ]
0482 >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0483 ... initialWeights=np.array([1.0]))
0484 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0485 True
0486 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0487 True
0488 >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
0489 ... regParam=0.01, miniBatchFraction=1.0, initialWeights=np.array([1.0]), intercept=True,
0490 ... validateData=True)
0491 >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0492 True
0493 >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0494 True
0495
0496 .. versionadded:: 0.9.0
0497 """
0498 @since("1.4.0")
0499 def save(self, sc, path):
0500 """Save a RidgeRegressionMode."""
0501 java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel(
0502 _py2java(sc, self._coeff), self.intercept)
0503 java_model.save(sc._jsc.sc(), path)
0504
0505 @classmethod
0506 @since("1.4.0")
0507 def load(cls, sc, path):
0508 """Load a RidgeRegressionMode."""
0509 java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel.load(
0510 sc._jsc.sc(), path)
0511 weights = _java2py(sc, java_model.weights())
0512 intercept = java_model.intercept()
0513 model = RidgeRegressionModel(weights, intercept)
0514 return model
0515
0516
0517 class RidgeRegressionWithSGD(object):
0518 """
0519 .. versionadded:: 0.9.0
0520 .. note:: Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 0.0.
0521 Note the default regParam is 0.01 for RidgeRegressionWithSGD, but is 0.0 for
0522 LinearRegression.
0523 """
0524 @classmethod
0525 @since("0.9.0")
0526 def train(cls, data, iterations=100, step=1.0, regParam=0.01,
0527 miniBatchFraction=1.0, initialWeights=None, intercept=False,
0528 validateData=True, convergenceTol=0.001):
0529 """
0530 Train a regression model with L2-regularization using Stochastic
0531 Gradient Descent. This solves the l2-regularized least squares
0532 regression formulation
0533
0534 f(weights) = 1/(2n) ||A weights - y||^2 + regParam/2 ||weights||^2
0535
0536 Here the data matrix has n rows, and the input RDD holds the set
0537 of rows of A, each with its corresponding right hand side label y.
0538 See also the documentation for the precise formulation.
0539
0540 :param data:
0541 The training data, an RDD of LabeledPoint.
0542 :param iterations:
0543 The number of iterations.
0544 (default: 100)
0545 :param step:
0546 The step parameter used in SGD.
0547 (default: 1.0)
0548 :param regParam:
0549 The regularizer parameter.
0550 (default: 0.01)
0551 :param miniBatchFraction:
0552 Fraction of data to be used for each SGD iteration.
0553 (default: 1.0)
0554 :param initialWeights:
0555 The initial weights.
0556 (default: None)
0557 :param intercept:
0558 Boolean parameter which indicates the use or not of the
0559 augmented representation for training data (i.e. whether bias
0560 features are activated or not).
0561 (default: False)
0562 :param validateData:
0563 Boolean parameter which indicates if the algorithm should
0564 validate data before training.
0565 (default: True)
0566 :param convergenceTol:
0567 A condition which decides iteration termination.
0568 (default: 0.001)
0569 """
0570 warnings.warn(
0571 "Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 0.0. "
0572 "Note the default regParam is 0.01 for RidgeRegressionWithSGD, but is 0.0 for "
0573 "LinearRegression.", DeprecationWarning)
0574
0575 def train(rdd, i):
0576 return callMLlibFunc("trainRidgeModelWithSGD", rdd, int(iterations), float(step),
0577 float(regParam), float(miniBatchFraction), i, bool(intercept),
0578 bool(validateData), float(convergenceTol))
0579
0580 return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
0581
0582
0583 class IsotonicRegressionModel(Saveable, Loader):
0584
0585 """
0586 Regression model for isotonic regression.
0587
0588 :param boundaries:
0589 Array of boundaries for which predictions are known. Boundaries
0590 must be sorted in increasing order.
0591 :param predictions:
0592 Array of predictions associated to the boundaries at the same
0593 index. Results of isotonic regression and therefore monotone.
0594 :param isotonic:
0595 Indicates whether this is isotonic or antitonic.
0596
0597 >>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
0598 >>> irm = IsotonicRegression.train(sc.parallelize(data))
0599 >>> irm.predict(3)
0600 2.0
0601 >>> irm.predict(5)
0602 16.5
0603 >>> irm.predict(sc.parallelize([3, 5])).collect()
0604 [2.0, 16.5]
0605 >>> import os, tempfile
0606 >>> path = tempfile.mkdtemp()
0607 >>> irm.save(sc, path)
0608 >>> sameModel = IsotonicRegressionModel.load(sc, path)
0609 >>> sameModel.predict(3)
0610 2.0
0611 >>> sameModel.predict(5)
0612 16.5
0613 >>> from shutil import rmtree
0614 >>> try:
0615 ... rmtree(path)
0616 ... except OSError:
0617 ... pass
0618
0619 .. versionadded:: 1.4.0
0620 """
0621
0622 def __init__(self, boundaries, predictions, isotonic):
0623 self.boundaries = boundaries
0624 self.predictions = predictions
0625 self.isotonic = isotonic
0626
0627 @since("1.4.0")
0628 def predict(self, x):
0629 """
0630 Predict labels for provided features.
0631 Using a piecewise linear function.
0632 1) If x exactly matches a boundary then associated prediction
0633 is returned. In case there are multiple predictions with the
0634 same boundary then one of them is returned. Which one is
0635 undefined (same as java.util.Arrays.binarySearch).
0636 2) If x is lower or higher than all boundaries then first or
0637 last prediction is returned respectively. In case there are
0638 multiple predictions with the same boundary then the lowest
0639 or highest is returned respectively.
0640 3) If x falls between two values in boundary array then
0641 prediction is treated as piecewise linear function and
0642 interpolated value is returned. In case there are multiple
0643 values with the same boundary then the same rules as in 2)
0644 are used.
0645
0646 :param x:
0647 Feature or RDD of Features to be labeled.
0648 """
0649 if isinstance(x, RDD):
0650 return x.map(lambda v: self.predict(v))
0651 return np.interp(x, self.boundaries, self.predictions)
0652
0653 @since("1.4.0")
0654 def save(self, sc, path):
0655 """Save an IsotonicRegressionModel."""
0656 java_boundaries = _py2java(sc, self.boundaries.tolist())
0657 java_predictions = _py2java(sc, self.predictions.tolist())
0658 java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
0659 java_boundaries, java_predictions, self.isotonic)
0660 java_model.save(sc._jsc.sc(), path)
0661
0662 @classmethod
0663 @since("1.4.0")
0664 def load(cls, sc, path):
0665 """Load an IsotonicRegressionModel."""
0666 java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
0667 sc._jsc.sc(), path)
0668 py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
0669 py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
0670 return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)
0671
0672
0673 class IsotonicRegression(object):
0674 """
0675 Isotonic regression.
0676 Currently implemented using parallelized pool adjacent violators
0677 algorithm. Only univariate (single feature) algorithm supported.
0678
0679 Sequential PAV implementation based on:
0680
0681 Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
0682 "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
0683 Available from http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf
0684
0685 Sequential PAV parallelization based on:
0686
0687 Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
0688 "An approach to parallelizing isotonic regression."
0689 Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
0690 Available from http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf
0691
0692 See `Isotonic regression (Wikipedia) <http://en.wikipedia.org/wiki/Isotonic_regression>`_.
0693
0694 .. versionadded:: 1.4.0
0695 """
0696
0697 @classmethod
0698 @since("1.4.0")
0699 def train(cls, data, isotonic=True):
0700 """
0701 Train an isotonic regression model on the given data.
0702
0703 :param data:
0704 RDD of (label, feature, weight) tuples.
0705 :param isotonic:
0706 Whether this is isotonic (which is default) or antitonic.
0707 (default: True)
0708 """
0709 boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
0710 data.map(_convert_to_vector), bool(isotonic))
0711 return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
0712
0713
0714 class StreamingLinearAlgorithm(object):
0715 """
0716 Base class that has to be inherited by any StreamingLinearAlgorithm.
0717
0718 Prevents reimplementation of methods predictOn and predictOnValues.
0719
0720 .. versionadded:: 1.5.0
0721 """
0722 def __init__(self, model):
0723 self._model = model
0724
0725 @since("1.5.0")
0726 def latestModel(self):
0727 """
0728 Returns the latest model.
0729 """
0730 return self._model
0731
0732 def _validate(self, dstream):
0733 if not isinstance(dstream, DStream):
0734 raise TypeError(
0735 "dstream should be a DStream object, got %s" % type(dstream))
0736 if not self._model:
0737 raise ValueError(
0738 "Model must be intialized using setInitialWeights")
0739
0740 @since("1.5.0")
0741 def predictOn(self, dstream):
0742 """
0743 Use the model to make predictions on batches of data from a
0744 DStream.
0745
0746 :return:
0747 DStream containing predictions.
0748 """
0749 self._validate(dstream)
0750 return dstream.map(lambda x: self._model.predict(x))
0751
0752 @since("1.5.0")
0753 def predictOnValues(self, dstream):
0754 """
0755 Use the model to make predictions on the values of a DStream and
0756 carry over its keys.
0757
0758 :return:
0759 DStream containing the input keys and the predictions as values.
0760 """
0761 self._validate(dstream)
0762 return dstream.mapValues(lambda x: self._model.predict(x))
0763
0764
0765 @inherit_doc
0766 class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
0767 """
0768 Train or predict a linear regression model on streaming data.
0769 Training uses Stochastic Gradient Descent to update the model
0770 based on each new batch of incoming data from a DStream
0771 (see `LinearRegressionWithSGD` for model equation).
0772
0773 Each batch of data is assumed to be an RDD of LabeledPoints.
0774 The number of data points per batch can vary, but the number
0775 of features must be constant. An initial weight vector must
0776 be provided.
0777
0778 :param stepSize:
0779 Step size for each iteration of gradient descent.
0780 (default: 0.1)
0781 :param numIterations:
0782 Number of iterations run for each batch of data.
0783 (default: 50)
0784 :param miniBatchFraction:
0785 Fraction of each batch of data to use for updates.
0786 (default: 1.0)
0787 :param convergenceTol:
0788 Value used to determine when to terminate iterations.
0789 (default: 0.001)
0790
0791 .. versionadded:: 1.5.0
0792 """
0793 def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001):
0794 self.stepSize = stepSize
0795 self.numIterations = numIterations
0796 self.miniBatchFraction = miniBatchFraction
0797 self.convergenceTol = convergenceTol
0798 self._model = None
0799 super(StreamingLinearRegressionWithSGD, self).__init__(
0800 model=self._model)
0801
0802 @since("1.5.0")
0803 def setInitialWeights(self, initialWeights):
0804 """
0805 Set the initial value of weights.
0806
0807 This must be set before running trainOn and predictOn
0808 """
0809 initialWeights = _convert_to_vector(initialWeights)
0810 self._model = LinearRegressionModel(initialWeights, 0)
0811 return self
0812
0813 @since("1.5.0")
0814 def trainOn(self, dstream):
0815 """Train the model on the incoming dstream."""
0816 self._validate(dstream)
0817
0818 def update(rdd):
0819
0820 if not rdd.isEmpty():
0821 self._model = LinearRegressionWithSGD.train(
0822 rdd, self.numIterations, self.stepSize,
0823 self.miniBatchFraction, self._model.weights,
0824 intercept=self._model.intercept, convergenceTol=self.convergenceTol)
0825
0826 dstream.foreachRDD(update)
0827
0828
0829 def _test():
0830 import doctest
0831 from pyspark.sql import SparkSession
0832 import pyspark.mllib.regression
0833 globs = pyspark.mllib.regression.__dict__.copy()
0834 spark = SparkSession.builder\
0835 .master("local[2]")\
0836 .appName("mllib.regression tests")\
0837 .getOrCreate()
0838 globs['sc'] = spark.sparkContext
0839 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0840 spark.stop()
0841 if failure_count:
0842 sys.exit(-1)
0843
0844 if __name__ == "__main__":
0845 _test()