Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 import warnings
0020 
0021 import numpy as np
0022 
0023 from pyspark import RDD, since
0024 from pyspark.streaming.dstream import DStream
0025 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
0026 from pyspark.mllib.linalg import SparseVector, _convert_to_vector
0027 from pyspark.mllib.util import Saveable, Loader
0028 
0029 __all__ = ['LabeledPoint', 'LinearModel',
0030            'LinearRegressionModel', 'LinearRegressionWithSGD',
0031            'RidgeRegressionModel', 'RidgeRegressionWithSGD',
0032            'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
0033            'IsotonicRegression', 'StreamingLinearAlgorithm',
0034            'StreamingLinearRegressionWithSGD']
0035 
0036 
0037 class LabeledPoint(object):
0038 
0039     """
0040     Class that represents the features and labels of a data point.
0041 
0042     :param label:
0043       Label for this data point.
0044     :param features:
0045       Vector of features for this point (NumPy array, list,
0046       pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix).
0047 
0048     .. note:: 'label' and 'features' are accessible as class attributes.
0049 
0050     .. versionadded:: 1.0.0
0051     """
0052 
0053     def __init__(self, label, features):
0054         self.label = float(label)
0055         self.features = _convert_to_vector(features)
0056 
0057     def __reduce__(self):
0058         return (LabeledPoint, (self.label, self.features))
0059 
0060     def __str__(self):
0061         return "(" + ",".join((str(self.label), str(self.features))) + ")"
0062 
0063     def __repr__(self):
0064         return "LabeledPoint(%s, %s)" % (self.label, self.features)
0065 
0066 
0067 class LinearModel(object):
0068 
0069     """
0070     A linear model that has a vector of coefficients and an intercept.
0071 
0072     :param weights:
0073       Weights computed for every feature.
0074     :param intercept:
0075       Intercept computed for this model.
0076 
0077     .. versionadded:: 0.9.0
0078     """
0079 
0080     def __init__(self, weights, intercept):
0081         self._coeff = _convert_to_vector(weights)
0082         self._intercept = float(intercept)
0083 
0084     @property
0085     @since("1.0.0")
0086     def weights(self):
0087         """Weights computed for every feature."""
0088         return self._coeff
0089 
0090     @property
0091     @since("1.0.0")
0092     def intercept(self):
0093         """Intercept computed for this model."""
0094         return self._intercept
0095 
0096     def __repr__(self):
0097         return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)
0098 
0099 
0100 @inherit_doc
0101 class LinearRegressionModelBase(LinearModel):
0102 
0103     """A linear regression model.
0104 
0105     >>> lrmb = LinearRegressionModelBase(np.array([1.0, 2.0]), 0.1)
0106     >>> abs(lrmb.predict(np.array([-1.03, 7.777])) - 14.624) < 1e-6
0107     True
0108     >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
0109     True
0110 
0111     .. versionadded:: 0.9.0
0112     """
0113 
0114     @since("0.9.0")
0115     def predict(self, x):
0116         """
0117         Predict the value of the dependent variable given a vector or
0118         an RDD of vectors containing values for the independent variables.
0119         """
0120         if isinstance(x, RDD):
0121             return x.map(self.predict)
0122         x = _convert_to_vector(x)
0123         return self.weights.dot(x) + self.intercept
0124 
0125 
0126 @inherit_doc
0127 class LinearRegressionModel(LinearRegressionModelBase):
0128 
0129     """A linear regression model derived from a least-squares fit.
0130 
0131     >>> from pyspark.mllib.regression import LabeledPoint
0132     >>> data = [
0133     ...     LabeledPoint(0.0, [0.0]),
0134     ...     LabeledPoint(1.0, [1.0]),
0135     ...     LabeledPoint(3.0, [2.0]),
0136     ...     LabeledPoint(2.0, [3.0])
0137     ... ]
0138     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0139     ...     initialWeights=np.array([1.0]))
0140     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0141     True
0142     >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
0143     True
0144     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0145     True
0146     >>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
0147     True
0148     >>> import os, tempfile
0149     >>> path = tempfile.mkdtemp()
0150     >>> lrm.save(sc, path)
0151     >>> sameModel = LinearRegressionModel.load(sc, path)
0152     >>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
0153     True
0154     >>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
0155     True
0156     >>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0157     True
0158     >>> from shutil import rmtree
0159     >>> try:
0160     ...     rmtree(path)
0161     ... except:
0162     ...     pass
0163     >>> data = [
0164     ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
0165     ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
0166     ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
0167     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
0168     ... ]
0169     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0170     ...     initialWeights=np.array([1.0]))
0171     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0172     True
0173     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0174     True
0175     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
0176     ...    miniBatchFraction=1.0, initialWeights=np.array([1.0]), regParam=0.1, regType="l2",
0177     ...    intercept=True, validateData=True)
0178     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0179     True
0180     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0181     True
0182 
0183     .. versionadded:: 0.9.0
0184     """
0185     @since("1.4.0")
0186     def save(self, sc, path):
0187         """Save a LinearRegressionModel."""
0188         java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel(
0189             _py2java(sc, self._coeff), self.intercept)
0190         java_model.save(sc._jsc.sc(), path)
0191 
0192     @classmethod
0193     @since("1.4.0")
0194     def load(cls, sc, path):
0195         """Load a LinearRegressionModel."""
0196         java_model = sc._jvm.org.apache.spark.mllib.regression.LinearRegressionModel.load(
0197             sc._jsc.sc(), path)
0198         weights = _java2py(sc, java_model.weights())
0199         intercept = java_model.intercept()
0200         model = LinearRegressionModel(weights, intercept)
0201         return model
0202 
0203 
0204 # train_func should take two parameters, namely data and initial_weights, and
0205 # return the result of a call to the appropriate JVM stub.
0206 # _regression_train_wrapper is responsible for setup and error checking.
0207 def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
0208     from pyspark.mllib.classification import LogisticRegressionModel
0209     first = data.first()
0210     if not isinstance(first, LabeledPoint):
0211         raise TypeError("data should be an RDD of LabeledPoint, but got %s" % type(first))
0212     if initial_weights is None:
0213         initial_weights = [0.0] * len(data.first().features)
0214     if (modelClass == LogisticRegressionModel):
0215         weights, intercept, numFeatures, numClasses = train_func(
0216             data, _convert_to_vector(initial_weights))
0217         return modelClass(weights, intercept, numFeatures, numClasses)
0218     else:
0219         weights, intercept = train_func(data, _convert_to_vector(initial_weights))
0220         return modelClass(weights, intercept)
0221 
0222 
0223 class LinearRegressionWithSGD(object):
0224     """
0225     .. versionadded:: 0.9.0
0226     .. note:: Deprecated in 2.0.0. Use ml.regression.LinearRegression.
0227     """
0228     @classmethod
0229     @since("0.9.0")
0230     def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
0231               initialWeights=None, regParam=0.0, regType=None, intercept=False,
0232               validateData=True, convergenceTol=0.001):
0233         """
0234         Train a linear regression model using Stochastic Gradient
0235         Descent (SGD). This solves the least squares regression
0236         formulation
0237 
0238             f(weights) = 1/(2n) ||A weights - y||^2
0239 
0240         which is the mean squared error. Here the data matrix has n rows,
0241         and the input RDD holds the set of rows of A, each with its
0242         corresponding right hand side label y.
0243         See also the documentation for the precise formulation.
0244 
0245         :param data:
0246           The training data, an RDD of LabeledPoint.
0247         :param iterations:
0248           The number of iterations.
0249           (default: 100)
0250         :param step:
0251           The step parameter used in SGD.
0252           (default: 1.0)
0253         :param miniBatchFraction:
0254           Fraction of data to be used for each SGD iteration.
0255           (default: 1.0)
0256         :param initialWeights:
0257           The initial weights.
0258           (default: None)
0259         :param regParam:
0260           The regularizer parameter.
0261           (default: 0.0)
0262         :param regType:
0263           The type of regularizer used for training our model.
0264           Supported values:
0265 
0266             - "l1" for using L1 regularization
0267             - "l2" for using L2 regularization
0268             - None for no regularization (default)
0269         :param intercept:
0270           Boolean parameter which indicates the use or not of the
0271           augmented representation for training data (i.e., whether bias
0272           features are activated or not).
0273           (default: False)
0274         :param validateData:
0275           Boolean parameter which indicates if the algorithm should
0276           validate data before training.
0277           (default: True)
0278         :param convergenceTol:
0279           A condition which decides iteration termination.
0280           (default: 0.001)
0281         """
0282         warnings.warn(
0283             "Deprecated in 2.0.0. Use ml.regression.LinearRegression.", DeprecationWarning)
0284 
0285         def train(rdd, i):
0286             return callMLlibFunc("trainLinearRegressionModelWithSGD", rdd, int(iterations),
0287                                  float(step), float(miniBatchFraction), i, float(regParam),
0288                                  regType, bool(intercept), bool(validateData),
0289                                  float(convergenceTol))
0290 
0291         return _regression_train_wrapper(train, LinearRegressionModel, data, initialWeights)
0292 
0293 
0294 @inherit_doc
0295 class LassoModel(LinearRegressionModelBase):
0296 
0297     """A linear regression model derived from a least-squares fit with
0298     an l_1 penalty term.
0299 
0300     >>> from pyspark.mllib.regression import LabeledPoint
0301     >>> data = [
0302     ...     LabeledPoint(0.0, [0.0]),
0303     ...     LabeledPoint(1.0, [1.0]),
0304     ...     LabeledPoint(3.0, [2.0]),
0305     ...     LabeledPoint(2.0, [3.0])
0306     ... ]
0307     >>> lrm = LassoWithSGD.train(
0308     ...     sc.parallelize(data), iterations=10, initialWeights=np.array([1.0]))
0309     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0310     True
0311     >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
0312     True
0313     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0314     True
0315     >>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
0316     True
0317     >>> import os, tempfile
0318     >>> path = tempfile.mkdtemp()
0319     >>> lrm.save(sc, path)
0320     >>> sameModel = LassoModel.load(sc, path)
0321     >>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
0322     True
0323     >>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
0324     True
0325     >>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0326     True
0327     >>> from shutil import rmtree
0328     >>> try:
0329     ...    rmtree(path)
0330     ... except:
0331     ...    pass
0332     >>> data = [
0333     ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
0334     ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
0335     ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
0336     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
0337     ... ]
0338     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0339     ...     initialWeights=np.array([1.0]))
0340     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0341     True
0342     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0343     True
0344     >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
0345     ...     regParam=0.01, miniBatchFraction=1.0, initialWeights=np.array([1.0]), intercept=True,
0346     ...     validateData=True)
0347     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0348     True
0349     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0350     True
0351 
0352     .. versionadded:: 0.9.0
0353     """
0354     @since("1.4.0")
0355     def save(self, sc, path):
0356         """Save a LassoModel."""
0357         java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel(
0358             _py2java(sc, self._coeff), self.intercept)
0359         java_model.save(sc._jsc.sc(), path)
0360 
0361     @classmethod
0362     @since("1.4.0")
0363     def load(cls, sc, path):
0364         """Load a LassoModel."""
0365         java_model = sc._jvm.org.apache.spark.mllib.regression.LassoModel.load(
0366             sc._jsc.sc(), path)
0367         weights = _java2py(sc, java_model.weights())
0368         intercept = java_model.intercept()
0369         model = LassoModel(weights, intercept)
0370         return model
0371 
0372 
0373 class LassoWithSGD(object):
0374     """
0375     .. versionadded:: 0.9.0
0376     .. note:: Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 1.0.
0377             Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression.
0378     """
0379     @classmethod
0380     @since("0.9.0")
0381     def train(cls, data, iterations=100, step=1.0, regParam=0.01,
0382               miniBatchFraction=1.0, initialWeights=None, intercept=False,
0383               validateData=True, convergenceTol=0.001):
0384         """
0385         Train a regression model with L1-regularization using Stochastic
0386         Gradient Descent. This solves the l1-regularized least squares
0387         regression formulation
0388 
0389             f(weights) = 1/(2n) ||A weights - y||^2  + regParam ||weights||_1
0390 
0391         Here the data matrix has n rows, and the input RDD holds the set
0392         of rows of A, each with its corresponding right hand side label y.
0393         See also the documentation for the precise formulation.
0394 
0395         :param data:
0396           The training data, an RDD of LabeledPoint.
0397         :param iterations:
0398           The number of iterations.
0399           (default: 100)
0400         :param step:
0401           The step parameter used in SGD.
0402           (default: 1.0)
0403         :param regParam:
0404           The regularizer parameter.
0405           (default: 0.01)
0406         :param miniBatchFraction:
0407           Fraction of data to be used for each SGD iteration.
0408           (default: 1.0)
0409         :param initialWeights:
0410           The initial weights.
0411           (default: None)
0412         :param intercept:
0413           Boolean parameter which indicates the use or not of the
0414           augmented representation for training data (i.e. whether bias
0415           features are activated or not).
0416           (default: False)
0417         :param validateData:
0418           Boolean parameter which indicates if the algorithm should
0419           validate data before training.
0420           (default: True)
0421         :param convergenceTol:
0422           A condition which decides iteration termination.
0423           (default: 0.001)
0424         """
0425         warnings.warn(
0426             "Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 1.0. "
0427             "Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression.",
0428             DeprecationWarning)
0429 
0430         def train(rdd, i):
0431             return callMLlibFunc("trainLassoModelWithSGD", rdd, int(iterations), float(step),
0432                                  float(regParam), float(miniBatchFraction), i, bool(intercept),
0433                                  bool(validateData), float(convergenceTol))
0434 
0435         return _regression_train_wrapper(train, LassoModel, data, initialWeights)
0436 
0437 
0438 @inherit_doc
0439 class RidgeRegressionModel(LinearRegressionModelBase):
0440 
0441     """A linear regression model derived from a least-squares fit with
0442     an l_2 penalty term.
0443 
0444     >>> from pyspark.mllib.regression import LabeledPoint
0445     >>> data = [
0446     ...     LabeledPoint(0.0, [0.0]),
0447     ...     LabeledPoint(1.0, [1.0]),
0448     ...     LabeledPoint(3.0, [2.0]),
0449     ...     LabeledPoint(2.0, [3.0])
0450     ... ]
0451     >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0452     ...     initialWeights=np.array([1.0]))
0453     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0454     True
0455     >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
0456     True
0457     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0458     True
0459     >>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
0460     True
0461     >>> import os, tempfile
0462     >>> path = tempfile.mkdtemp()
0463     >>> lrm.save(sc, path)
0464     >>> sameModel = RidgeRegressionModel.load(sc, path)
0465     >>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
0466     True
0467     >>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
0468     True
0469     >>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0470     True
0471     >>> from shutil import rmtree
0472     >>> try:
0473     ...    rmtree(path)
0474     ... except:
0475     ...    pass
0476     >>> data = [
0477     ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
0478     ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
0479     ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
0480     ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
0481     ... ]
0482     >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
0483     ...     initialWeights=np.array([1.0]))
0484     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0485     True
0486     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0487     True
0488     >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
0489     ...     regParam=0.01, miniBatchFraction=1.0, initialWeights=np.array([1.0]), intercept=True,
0490     ...     validateData=True)
0491     >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
0492     True
0493     >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
0494     True
0495 
0496     .. versionadded:: 0.9.0
0497     """
0498     @since("1.4.0")
0499     def save(self, sc, path):
0500         """Save a RidgeRegressionMode."""
0501         java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel(
0502             _py2java(sc, self._coeff), self.intercept)
0503         java_model.save(sc._jsc.sc(), path)
0504 
0505     @classmethod
0506     @since("1.4.0")
0507     def load(cls, sc, path):
0508         """Load a RidgeRegressionMode."""
0509         java_model = sc._jvm.org.apache.spark.mllib.regression.RidgeRegressionModel.load(
0510             sc._jsc.sc(), path)
0511         weights = _java2py(sc, java_model.weights())
0512         intercept = java_model.intercept()
0513         model = RidgeRegressionModel(weights, intercept)
0514         return model
0515 
0516 
0517 class RidgeRegressionWithSGD(object):
0518     """
0519     .. versionadded:: 0.9.0
0520     .. note:: Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 0.0.
0521             Note the default regParam is 0.01 for RidgeRegressionWithSGD, but is 0.0 for
0522             LinearRegression.
0523     """
0524     @classmethod
0525     @since("0.9.0")
0526     def train(cls, data, iterations=100, step=1.0, regParam=0.01,
0527               miniBatchFraction=1.0, initialWeights=None, intercept=False,
0528               validateData=True, convergenceTol=0.001):
0529         """
0530         Train a regression model with L2-regularization using Stochastic
0531         Gradient Descent. This solves the l2-regularized least squares
0532         regression formulation
0533 
0534             f(weights) = 1/(2n) ||A weights - y||^2 + regParam/2 ||weights||^2
0535 
0536         Here the data matrix has n rows, and the input RDD holds the set
0537         of rows of A, each with its corresponding right hand side label y.
0538         See also the documentation for the precise formulation.
0539 
0540         :param data:
0541           The training data, an RDD of LabeledPoint.
0542         :param iterations:
0543           The number of iterations.
0544           (default: 100)
0545         :param step:
0546           The step parameter used in SGD.
0547           (default: 1.0)
0548         :param regParam:
0549           The regularizer parameter.
0550           (default: 0.01)
0551         :param miniBatchFraction:
0552           Fraction of data to be used for each SGD iteration.
0553           (default: 1.0)
0554         :param initialWeights:
0555           The initial weights.
0556           (default: None)
0557         :param intercept:
0558           Boolean parameter which indicates the use or not of the
0559           augmented representation for training data (i.e. whether bias
0560           features are activated or not).
0561           (default: False)
0562         :param validateData:
0563           Boolean parameter which indicates if the algorithm should
0564           validate data before training.
0565           (default: True)
0566         :param convergenceTol:
0567           A condition which decides iteration termination.
0568           (default: 0.001)
0569         """
0570         warnings.warn(
0571             "Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 0.0. "
0572             "Note the default regParam is 0.01 for RidgeRegressionWithSGD, but is 0.0 for "
0573             "LinearRegression.", DeprecationWarning)
0574 
0575         def train(rdd, i):
0576             return callMLlibFunc("trainRidgeModelWithSGD", rdd, int(iterations), float(step),
0577                                  float(regParam), float(miniBatchFraction), i, bool(intercept),
0578                                  bool(validateData), float(convergenceTol))
0579 
0580         return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
0581 
0582 
0583 class IsotonicRegressionModel(Saveable, Loader):
0584 
0585     """
0586     Regression model for isotonic regression.
0587 
0588     :param boundaries:
0589       Array of boundaries for which predictions are known. Boundaries
0590       must be sorted in increasing order.
0591     :param predictions:
0592       Array of predictions associated to the boundaries at the same
0593       index. Results of isotonic regression and therefore monotone.
0594     :param isotonic:
0595       Indicates whether this is isotonic or antitonic.
0596 
0597     >>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
0598     >>> irm = IsotonicRegression.train(sc.parallelize(data))
0599     >>> irm.predict(3)
0600     2.0
0601     >>> irm.predict(5)
0602     16.5
0603     >>> irm.predict(sc.parallelize([3, 5])).collect()
0604     [2.0, 16.5]
0605     >>> import os, tempfile
0606     >>> path = tempfile.mkdtemp()
0607     >>> irm.save(sc, path)
0608     >>> sameModel = IsotonicRegressionModel.load(sc, path)
0609     >>> sameModel.predict(3)
0610     2.0
0611     >>> sameModel.predict(5)
0612     16.5
0613     >>> from shutil import rmtree
0614     >>> try:
0615     ...     rmtree(path)
0616     ... except OSError:
0617     ...     pass
0618 
0619     .. versionadded:: 1.4.0
0620     """
0621 
0622     def __init__(self, boundaries, predictions, isotonic):
0623         self.boundaries = boundaries
0624         self.predictions = predictions
0625         self.isotonic = isotonic
0626 
0627     @since("1.4.0")
0628     def predict(self, x):
0629         """
0630         Predict labels for provided features.
0631         Using a piecewise linear function.
0632         1) If x exactly matches a boundary then associated prediction
0633         is returned. In case there are multiple predictions with the
0634         same boundary then one of them is returned. Which one is
0635         undefined (same as java.util.Arrays.binarySearch).
0636         2) If x is lower or higher than all boundaries then first or
0637         last prediction is returned respectively. In case there are
0638         multiple predictions with the same boundary then the lowest
0639         or highest is returned respectively.
0640         3) If x falls between two values in boundary array then
0641         prediction is treated as piecewise linear function and
0642         interpolated value is returned. In case there are multiple
0643         values with the same boundary then the same rules as in 2)
0644         are used.
0645 
0646         :param x:
0647           Feature or RDD of Features to be labeled.
0648         """
0649         if isinstance(x, RDD):
0650             return x.map(lambda v: self.predict(v))
0651         return np.interp(x, self.boundaries, self.predictions)
0652 
0653     @since("1.4.0")
0654     def save(self, sc, path):
0655         """Save an IsotonicRegressionModel."""
0656         java_boundaries = _py2java(sc, self.boundaries.tolist())
0657         java_predictions = _py2java(sc, self.predictions.tolist())
0658         java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
0659             java_boundaries, java_predictions, self.isotonic)
0660         java_model.save(sc._jsc.sc(), path)
0661 
0662     @classmethod
0663     @since("1.4.0")
0664     def load(cls, sc, path):
0665         """Load an IsotonicRegressionModel."""
0666         java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
0667             sc._jsc.sc(), path)
0668         py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
0669         py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
0670         return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)
0671 
0672 
0673 class IsotonicRegression(object):
0674     """
0675     Isotonic regression.
0676     Currently implemented using parallelized pool adjacent violators
0677     algorithm. Only univariate (single feature) algorithm supported.
0678 
0679     Sequential PAV implementation based on:
0680 
0681       Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
0682       "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
0683       Available from http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf
0684 
0685     Sequential PAV parallelization based on:
0686 
0687         Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
0688         "An approach to parallelizing isotonic regression."
0689         Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996. 141-147.
0690         Available from http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf
0691 
0692     See `Isotonic regression (Wikipedia) <http://en.wikipedia.org/wiki/Isotonic_regression>`_.
0693 
0694     .. versionadded:: 1.4.0
0695     """
0696 
0697     @classmethod
0698     @since("1.4.0")
0699     def train(cls, data, isotonic=True):
0700         """
0701         Train an isotonic regression model on the given data.
0702 
0703         :param data:
0704           RDD of (label, feature, weight) tuples.
0705         :param isotonic:
0706           Whether this is isotonic (which is default) or antitonic.
0707           (default: True)
0708         """
0709         boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
0710                                                 data.map(_convert_to_vector), bool(isotonic))
0711         return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
0712 
0713 
0714 class StreamingLinearAlgorithm(object):
0715     """
0716     Base class that has to be inherited by any StreamingLinearAlgorithm.
0717 
0718     Prevents reimplementation of methods predictOn and predictOnValues.
0719 
0720     .. versionadded:: 1.5.0
0721     """
0722     def __init__(self, model):
0723         self._model = model
0724 
0725     @since("1.5.0")
0726     def latestModel(self):
0727         """
0728         Returns the latest model.
0729         """
0730         return self._model
0731 
0732     def _validate(self, dstream):
0733         if not isinstance(dstream, DStream):
0734             raise TypeError(
0735                 "dstream should be a DStream object, got %s" % type(dstream))
0736         if not self._model:
0737             raise ValueError(
0738                 "Model must be intialized using setInitialWeights")
0739 
0740     @since("1.5.0")
0741     def predictOn(self, dstream):
0742         """
0743         Use the model to make predictions on batches of data from a
0744         DStream.
0745 
0746         :return:
0747           DStream containing predictions.
0748         """
0749         self._validate(dstream)
0750         return dstream.map(lambda x: self._model.predict(x))
0751 
0752     @since("1.5.0")
0753     def predictOnValues(self, dstream):
0754         """
0755         Use the model to make predictions on the values of a DStream and
0756         carry over its keys.
0757 
0758         :return:
0759           DStream containing the input keys and the predictions as values.
0760         """
0761         self._validate(dstream)
0762         return dstream.mapValues(lambda x: self._model.predict(x))
0763 
0764 
0765 @inherit_doc
0766 class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
0767     """
0768     Train or predict a linear regression model on streaming data.
0769     Training uses Stochastic Gradient Descent to update the model
0770     based on each new batch of incoming data from a DStream
0771     (see `LinearRegressionWithSGD` for model equation).
0772 
0773     Each batch of data is assumed to be an RDD of LabeledPoints.
0774     The number of data points per batch can vary, but the number
0775     of features must be constant. An initial weight vector must
0776     be provided.
0777 
0778     :param stepSize:
0779       Step size for each iteration of gradient descent.
0780       (default: 0.1)
0781     :param numIterations:
0782       Number of iterations run for each batch of data.
0783       (default: 50)
0784     :param miniBatchFraction:
0785       Fraction of each batch of data to use for updates.
0786       (default: 1.0)
0787     :param convergenceTol:
0788       Value used to determine when to terminate iterations.
0789       (default: 0.001)
0790 
0791     .. versionadded:: 1.5.0
0792     """
0793     def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, convergenceTol=0.001):
0794         self.stepSize = stepSize
0795         self.numIterations = numIterations
0796         self.miniBatchFraction = miniBatchFraction
0797         self.convergenceTol = convergenceTol
0798         self._model = None
0799         super(StreamingLinearRegressionWithSGD, self).__init__(
0800             model=self._model)
0801 
0802     @since("1.5.0")
0803     def setInitialWeights(self, initialWeights):
0804         """
0805         Set the initial value of weights.
0806 
0807         This must be set before running trainOn and predictOn
0808         """
0809         initialWeights = _convert_to_vector(initialWeights)
0810         self._model = LinearRegressionModel(initialWeights, 0)
0811         return self
0812 
0813     @since("1.5.0")
0814     def trainOn(self, dstream):
0815         """Train the model on the incoming dstream."""
0816         self._validate(dstream)
0817 
0818         def update(rdd):
0819             # LinearRegressionWithSGD.train raises an error for an empty RDD.
0820             if not rdd.isEmpty():
0821                 self._model = LinearRegressionWithSGD.train(
0822                     rdd, self.numIterations, self.stepSize,
0823                     self.miniBatchFraction, self._model.weights,
0824                     intercept=self._model.intercept, convergenceTol=self.convergenceTol)
0825 
0826         dstream.foreachRDD(update)
0827 
0828 
0829 def _test():
0830     import doctest
0831     from pyspark.sql import SparkSession
0832     import pyspark.mllib.regression
0833     globs = pyspark.mllib.regression.__dict__.copy()
0834     spark = SparkSession.builder\
0835         .master("local[2]")\
0836         .appName("mllib.regression tests")\
0837         .getOrCreate()
0838     globs['sc'] = spark.sparkContext
0839     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0840     spark.stop()
0841     if failure_count:
0842         sys.exit(-1)
0843 
0844 if __name__ == "__main__":
0845     _test()