0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 from math import exp
0019 import sys
0020 import warnings
0021
0022 import numpy
0023
0024 from pyspark import RDD, since
0025 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
0026 from pyspark.mllib.linalg import SparseVector, _convert_to_vector
0027 from pyspark.mllib.regression import (
0028 LabeledPoint, LinearModel, _regression_train_wrapper,
0029 StreamingLinearAlgorithm)
0030 from pyspark.mllib.util import Saveable, Loader, inherit_doc
0031
0032
0033 __all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
0034 'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
0035 'StreamingLogisticRegressionWithSGD']
0036
0037
0038 class LinearClassificationModel(LinearModel):
0039 """
0040 A private abstract class representing a multiclass classification
0041 model. The categories are represented by int values: 0, 1, 2, etc.
0042 """
0043 def __init__(self, weights, intercept):
0044 super(LinearClassificationModel, self).__init__(weights, intercept)
0045 self._threshold = None
0046
0047 @since('1.4.0')
0048 def setThreshold(self, value):
0049 """
0050 Sets the threshold that separates positive predictions from
0051 negative predictions. An example with prediction score greater
0052 than or equal to this threshold is identified as a positive,
0053 and negative otherwise. It is used for binary classification
0054 only.
0055 """
0056 self._threshold = value
0057
0058 @property
0059 @since('1.4.0')
0060 def threshold(self):
0061 """
0062 Returns the threshold (if any) used for converting raw
0063 prediction scores into 0/1 predictions. It is used for
0064 binary classification only.
0065 """
0066 return self._threshold
0067
0068 @since('1.4.0')
0069 def clearThreshold(self):
0070 """
0071 Clears the threshold so that `predict` will output raw
0072 prediction scores. It is used for binary classification only.
0073 """
0074 self._threshold = None
0075
0076 @since('1.4.0')
0077 def predict(self, test):
0078 """
0079 Predict values for a single data point or an RDD of points
0080 using the model trained.
0081 """
0082 raise NotImplementedError
0083
0084
0085 class LogisticRegressionModel(LinearClassificationModel):
0086
0087 """
0088 Classification model trained using Multinomial/Binary Logistic
0089 Regression.
0090
0091 :param weights:
0092 Weights computed for every feature.
0093 :param intercept:
0094 Intercept computed for this model. (Only used in Binary Logistic
0095 Regression. In Multinomial Logistic Regression, the intercepts will
0096 not bea single value, so the intercepts will be part of the
0097 weights.)
0098 :param numFeatures:
0099 The dimension of the features.
0100 :param numClasses:
0101 The number of possible outcomes for k classes classification problem
0102 in Multinomial Logistic Regression. By default, it is binary
0103 logistic regression so numClasses will be set to 2.
0104
0105 >>> data = [
0106 ... LabeledPoint(0.0, [0.0, 1.0]),
0107 ... LabeledPoint(1.0, [1.0, 0.0]),
0108 ... ]
0109 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)
0110 >>> lrm.predict([1.0, 0.0])
0111 1
0112 >>> lrm.predict([0.0, 1.0])
0113 0
0114 >>> lrm.predict(sc.parallelize([[1.0, 0.0], [0.0, 1.0]])).collect()
0115 [1, 0]
0116 >>> lrm.clearThreshold()
0117 >>> lrm.predict([0.0, 1.0])
0118 0.279...
0119
0120 >>> sparse_data = [
0121 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
0122 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
0123 ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
0124 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
0125 ... ]
0126 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10)
0127 >>> lrm.predict(numpy.array([0.0, 1.0]))
0128 1
0129 >>> lrm.predict(numpy.array([1.0, 0.0]))
0130 0
0131 >>> lrm.predict(SparseVector(2, {1: 1.0}))
0132 1
0133 >>> lrm.predict(SparseVector(2, {0: 1.0}))
0134 0
0135 >>> import os, tempfile
0136 >>> path = tempfile.mkdtemp()
0137 >>> lrm.save(sc, path)
0138 >>> sameModel = LogisticRegressionModel.load(sc, path)
0139 >>> sameModel.predict(numpy.array([0.0, 1.0]))
0140 1
0141 >>> sameModel.predict(SparseVector(2, {0: 1.0}))
0142 0
0143 >>> from shutil import rmtree
0144 >>> try:
0145 ... rmtree(path)
0146 ... except:
0147 ... pass
0148 >>> multi_class_data = [
0149 ... LabeledPoint(0.0, [0.0, 1.0, 0.0]),
0150 ... LabeledPoint(1.0, [1.0, 0.0, 0.0]),
0151 ... LabeledPoint(2.0, [0.0, 0.0, 1.0])
0152 ... ]
0153 >>> data = sc.parallelize(multi_class_data)
0154 >>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
0155 >>> mcm.predict([0.0, 0.5, 0.0])
0156 0
0157 >>> mcm.predict([0.8, 0.0, 0.0])
0158 1
0159 >>> mcm.predict([0.0, 0.0, 0.3])
0160 2
0161
0162 .. versionadded:: 0.9.0
0163 """
0164 def __init__(self, weights, intercept, numFeatures, numClasses):
0165 super(LogisticRegressionModel, self).__init__(weights, intercept)
0166 self._numFeatures = int(numFeatures)
0167 self._numClasses = int(numClasses)
0168 self._threshold = 0.5
0169 if self._numClasses == 2:
0170 self._dataWithBiasSize = None
0171 self._weightsMatrix = None
0172 else:
0173 self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1)
0174 self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1,
0175 self._dataWithBiasSize)
0176
0177 @property
0178 @since('1.4.0')
0179 def numFeatures(self):
0180 """
0181 Dimension of the features.
0182 """
0183 return self._numFeatures
0184
0185 @property
0186 @since('1.4.0')
0187 def numClasses(self):
0188 """
0189 Number of possible outcomes for k classes classification problem
0190 in Multinomial Logistic Regression.
0191 """
0192 return self._numClasses
0193
0194 @since('0.9.0')
0195 def predict(self, x):
0196 """
0197 Predict values for a single data point or an RDD of points
0198 using the model trained.
0199 """
0200 if isinstance(x, RDD):
0201 return x.map(lambda v: self.predict(v))
0202
0203 x = _convert_to_vector(x)
0204 if self.numClasses == 2:
0205 margin = self.weights.dot(x) + self._intercept
0206 if margin > 0:
0207 prob = 1 / (1 + exp(-margin))
0208 else:
0209 exp_margin = exp(margin)
0210 prob = exp_margin / (1 + exp_margin)
0211 if self._threshold is None:
0212 return prob
0213 else:
0214 return 1 if prob > self._threshold else 0
0215 else:
0216 best_class = 0
0217 max_margin = 0.0
0218 if x.size + 1 == self._dataWithBiasSize:
0219 for i in range(0, self._numClasses - 1):
0220 margin = x.dot(self._weightsMatrix[i][0:x.size]) + \
0221 self._weightsMatrix[i][x.size]
0222 if margin > max_margin:
0223 max_margin = margin
0224 best_class = i + 1
0225 else:
0226 for i in range(0, self._numClasses - 1):
0227 margin = x.dot(self._weightsMatrix[i])
0228 if margin > max_margin:
0229 max_margin = margin
0230 best_class = i + 1
0231 return best_class
0232
0233 @since('1.4.0')
0234 def save(self, sc, path):
0235 """
0236 Save this model to the given path.
0237 """
0238 java_model = sc._jvm.org.apache.spark.mllib.classification.LogisticRegressionModel(
0239 _py2java(sc, self._coeff), self.intercept, self.numFeatures, self.numClasses)
0240 java_model.save(sc._jsc.sc(), path)
0241
0242 @classmethod
0243 @since('1.4.0')
0244 def load(cls, sc, path):
0245 """
0246 Load a model from the given path.
0247 """
0248 java_model = sc._jvm.org.apache.spark.mllib.classification.LogisticRegressionModel.load(
0249 sc._jsc.sc(), path)
0250 weights = _java2py(sc, java_model.weights())
0251 intercept = java_model.intercept()
0252 numFeatures = java_model.numFeatures()
0253 numClasses = java_model.numClasses()
0254 threshold = java_model.getThreshold().get()
0255 model = LogisticRegressionModel(weights, intercept, numFeatures, numClasses)
0256 model.setThreshold(threshold)
0257 return model
0258
0259 def __repr__(self):
0260 return self._call_java("toString")
0261
0262
0263 class LogisticRegressionWithSGD(object):
0264 """
0265 .. versionadded:: 0.9.0
0266 .. note:: Deprecated in 2.0.0. Use ml.classification.LogisticRegression or
0267 LogisticRegressionWithLBFGS.
0268 """
0269 @classmethod
0270 @since('0.9.0')
0271 def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
0272 initialWeights=None, regParam=0.01, regType="l2", intercept=False,
0273 validateData=True, convergenceTol=0.001):
0274 """
0275 Train a logistic regression model on the given data.
0276
0277 :param data:
0278 The training data, an RDD of LabeledPoint.
0279 :param iterations:
0280 The number of iterations.
0281 (default: 100)
0282 :param step:
0283 The step parameter used in SGD.
0284 (default: 1.0)
0285 :param miniBatchFraction:
0286 Fraction of data to be used for each SGD iteration.
0287 (default: 1.0)
0288 :param initialWeights:
0289 The initial weights.
0290 (default: None)
0291 :param regParam:
0292 The regularizer parameter.
0293 (default: 0.01)
0294 :param regType:
0295 The type of regularizer used for training our model.
0296 Supported values:
0297
0298 - "l1" for using L1 regularization
0299 - "l2" for using L2 regularization (default)
0300 - None for no regularization
0301 :param intercept:
0302 Boolean parameter which indicates the use or not of the
0303 augmented representation for training data (i.e., whether bias
0304 features are activated or not).
0305 (default: False)
0306 :param validateData:
0307 Boolean parameter which indicates if the algorithm should
0308 validate data before training.
0309 (default: True)
0310 :param convergenceTol:
0311 A condition which decides iteration termination.
0312 (default: 0.001)
0313 """
0314 warnings.warn(
0315 "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "
0316 "LogisticRegressionWithLBFGS.", DeprecationWarning)
0317
0318 def train(rdd, i):
0319 return callMLlibFunc("trainLogisticRegressionModelWithSGD", rdd, int(iterations),
0320 float(step), float(miniBatchFraction), i, float(regParam), regType,
0321 bool(intercept), bool(validateData), float(convergenceTol))
0322
0323 return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
0324
0325
0326 class LogisticRegressionWithLBFGS(object):
0327 """
0328 .. versionadded:: 1.2.0
0329 """
0330 @classmethod
0331 @since('1.2.0')
0332 def train(cls, data, iterations=100, initialWeights=None, regParam=0.0, regType="l2",
0333 intercept=False, corrections=10, tolerance=1e-6, validateData=True, numClasses=2):
0334 """
0335 Train a logistic regression model on the given data.
0336
0337 :param data:
0338 The training data, an RDD of LabeledPoint.
0339 :param iterations:
0340 The number of iterations.
0341 (default: 100)
0342 :param initialWeights:
0343 The initial weights.
0344 (default: None)
0345 :param regParam:
0346 The regularizer parameter.
0347 (default: 0.0)
0348 :param regType:
0349 The type of regularizer used for training our model.
0350 Supported values:
0351
0352 - "l1" for using L1 regularization
0353 - "l2" for using L2 regularization (default)
0354 - None for no regularization
0355 :param intercept:
0356 Boolean parameter which indicates the use or not of the
0357 augmented representation for training data (i.e., whether bias
0358 features are activated or not).
0359 (default: False)
0360 :param corrections:
0361 The number of corrections used in the LBFGS update.
0362 If a known updater is used for binary classification,
0363 it calls the ml implementation and this parameter will
0364 have no effect. (default: 10)
0365 :param tolerance:
0366 The convergence tolerance of iterations for L-BFGS.
0367 (default: 1e-6)
0368 :param validateData:
0369 Boolean parameter which indicates if the algorithm should
0370 validate data before training.
0371 (default: True)
0372 :param numClasses:
0373 The number of classes (i.e., outcomes) a label can take in
0374 Multinomial Logistic Regression.
0375 (default: 2)
0376
0377 >>> data = [
0378 ... LabeledPoint(0.0, [0.0, 1.0]),
0379 ... LabeledPoint(1.0, [1.0, 0.0]),
0380 ... ]
0381 >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), iterations=10)
0382 >>> lrm.predict([1.0, 0.0])
0383 1
0384 >>> lrm.predict([0.0, 1.0])
0385 0
0386 """
0387 def train(rdd, i):
0388 return callMLlibFunc("trainLogisticRegressionModelWithLBFGS", rdd, int(iterations), i,
0389 float(regParam), regType, bool(intercept), int(corrections),
0390 float(tolerance), bool(validateData), int(numClasses))
0391
0392 if initialWeights is None:
0393 if numClasses == 2:
0394 initialWeights = [0.0] * len(data.first().features)
0395 else:
0396 if intercept:
0397 initialWeights = [0.0] * (len(data.first().features) + 1) * (numClasses - 1)
0398 else:
0399 initialWeights = [0.0] * len(data.first().features) * (numClasses - 1)
0400 return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
0401
0402
0403 class SVMModel(LinearClassificationModel):
0404
0405 """
0406 Model for Support Vector Machines (SVMs).
0407
0408 :param weights:
0409 Weights computed for every feature.
0410 :param intercept:
0411 Intercept computed for this model.
0412
0413 >>> data = [
0414 ... LabeledPoint(0.0, [0.0]),
0415 ... LabeledPoint(1.0, [1.0]),
0416 ... LabeledPoint(1.0, [2.0]),
0417 ... LabeledPoint(1.0, [3.0])
0418 ... ]
0419 >>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10)
0420 >>> svm.predict([1.0])
0421 1
0422 >>> svm.predict(sc.parallelize([[1.0]])).collect()
0423 [1]
0424 >>> svm.clearThreshold()
0425 >>> svm.predict(numpy.array([1.0]))
0426 1.44...
0427
0428 >>> sparse_data = [
0429 ... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
0430 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
0431 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
0432 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
0433 ... ]
0434 >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10)
0435 >>> svm.predict(SparseVector(2, {1: 1.0}))
0436 1
0437 >>> svm.predict(SparseVector(2, {0: -1.0}))
0438 0
0439 >>> import os, tempfile
0440 >>> path = tempfile.mkdtemp()
0441 >>> svm.save(sc, path)
0442 >>> sameModel = SVMModel.load(sc, path)
0443 >>> sameModel.predict(SparseVector(2, {1: 1.0}))
0444 1
0445 >>> sameModel.predict(SparseVector(2, {0: -1.0}))
0446 0
0447 >>> from shutil import rmtree
0448 >>> try:
0449 ... rmtree(path)
0450 ... except:
0451 ... pass
0452
0453 .. versionadded:: 0.9.0
0454 """
0455 def __init__(self, weights, intercept):
0456 super(SVMModel, self).__init__(weights, intercept)
0457 self._threshold = 0.0
0458
0459 @since('0.9.0')
0460 def predict(self, x):
0461 """
0462 Predict values for a single data point or an RDD of points
0463 using the model trained.
0464 """
0465 if isinstance(x, RDD):
0466 return x.map(lambda v: self.predict(v))
0467
0468 x = _convert_to_vector(x)
0469 margin = self.weights.dot(x) + self.intercept
0470 if self._threshold is None:
0471 return margin
0472 else:
0473 return 1 if margin > self._threshold else 0
0474
0475 @since('1.4.0')
0476 def save(self, sc, path):
0477 """
0478 Save this model to the given path.
0479 """
0480 java_model = sc._jvm.org.apache.spark.mllib.classification.SVMModel(
0481 _py2java(sc, self._coeff), self.intercept)
0482 java_model.save(sc._jsc.sc(), path)
0483
0484 @classmethod
0485 @since('1.4.0')
0486 def load(cls, sc, path):
0487 """
0488 Load a model from the given path.
0489 """
0490 java_model = sc._jvm.org.apache.spark.mllib.classification.SVMModel.load(
0491 sc._jsc.sc(), path)
0492 weights = _java2py(sc, java_model.weights())
0493 intercept = java_model.intercept()
0494 threshold = java_model.getThreshold().get()
0495 model = SVMModel(weights, intercept)
0496 model.setThreshold(threshold)
0497 return model
0498
0499
0500 class SVMWithSGD(object):
0501 """
0502 .. versionadded:: 0.9.0
0503 """
0504
0505 @classmethod
0506 @since('0.9.0')
0507 def train(cls, data, iterations=100, step=1.0, regParam=0.01,
0508 miniBatchFraction=1.0, initialWeights=None, regType="l2",
0509 intercept=False, validateData=True, convergenceTol=0.001):
0510 """
0511 Train a support vector machine on the given data.
0512
0513 :param data:
0514 The training data, an RDD of LabeledPoint.
0515 :param iterations:
0516 The number of iterations.
0517 (default: 100)
0518 :param step:
0519 The step parameter used in SGD.
0520 (default: 1.0)
0521 :param regParam:
0522 The regularizer parameter.
0523 (default: 0.01)
0524 :param miniBatchFraction:
0525 Fraction of data to be used for each SGD iteration.
0526 (default: 1.0)
0527 :param initialWeights:
0528 The initial weights.
0529 (default: None)
0530 :param regType:
0531 The type of regularizer used for training our model.
0532 Allowed values:
0533
0534 - "l1" for using L1 regularization
0535 - "l2" for using L2 regularization (default)
0536 - None for no regularization
0537 :param intercept:
0538 Boolean parameter which indicates the use or not of the
0539 augmented representation for training data (i.e. whether bias
0540 features are activated or not).
0541 (default: False)
0542 :param validateData:
0543 Boolean parameter which indicates if the algorithm should
0544 validate data before training.
0545 (default: True)
0546 :param convergenceTol:
0547 A condition which decides iteration termination.
0548 (default: 0.001)
0549 """
0550 def train(rdd, i):
0551 return callMLlibFunc("trainSVMModelWithSGD", rdd, int(iterations), float(step),
0552 float(regParam), float(miniBatchFraction), i, regType,
0553 bool(intercept), bool(validateData), float(convergenceTol))
0554
0555 return _regression_train_wrapper(train, SVMModel, data, initialWeights)
0556
0557
0558 @inherit_doc
0559 class NaiveBayesModel(Saveable, Loader):
0560
0561 """
0562 Model for Naive Bayes classifiers.
0563
0564 :param labels:
0565 List of labels.
0566 :param pi:
0567 Log of class priors, whose dimension is C, number of labels.
0568 :param theta:
0569 Log of class conditional probabilities, whose dimension is C-by-D,
0570 where D is number of features.
0571
0572 >>> data = [
0573 ... LabeledPoint(0.0, [0.0, 0.0]),
0574 ... LabeledPoint(0.0, [0.0, 1.0]),
0575 ... LabeledPoint(1.0, [1.0, 0.0]),
0576 ... ]
0577 >>> model = NaiveBayes.train(sc.parallelize(data))
0578 >>> model.predict(numpy.array([0.0, 1.0]))
0579 0.0
0580 >>> model.predict(numpy.array([1.0, 0.0]))
0581 1.0
0582 >>> model.predict(sc.parallelize([[1.0, 0.0]])).collect()
0583 [1.0]
0584 >>> sparse_data = [
0585 ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
0586 ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
0587 ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
0588 ... ]
0589 >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
0590 >>> model.predict(SparseVector(2, {1: 1.0}))
0591 0.0
0592 >>> model.predict(SparseVector(2, {0: 1.0}))
0593 1.0
0594 >>> import os, tempfile
0595 >>> path = tempfile.mkdtemp()
0596 >>> model.save(sc, path)
0597 >>> sameModel = NaiveBayesModel.load(sc, path)
0598 >>> sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
0599 True
0600 >>> from shutil import rmtree
0601 >>> try:
0602 ... rmtree(path)
0603 ... except OSError:
0604 ... pass
0605
0606 .. versionadded:: 0.9.0
0607 """
0608 def __init__(self, labels, pi, theta):
0609 self.labels = labels
0610 self.pi = pi
0611 self.theta = theta
0612
0613 @since('0.9.0')
0614 def predict(self, x):
0615 """
0616 Return the most likely class for a data vector
0617 or an RDD of vectors
0618 """
0619 if isinstance(x, RDD):
0620 return x.map(lambda v: self.predict(v))
0621 x = _convert_to_vector(x)
0622 return self.labels[numpy.argmax(self.pi + x.dot(self.theta.transpose()))]
0623
0624 def save(self, sc, path):
0625 """
0626 Save this model to the given path.
0627 """
0628 java_labels = _py2java(sc, self.labels.tolist())
0629 java_pi = _py2java(sc, self.pi.tolist())
0630 java_theta = _py2java(sc, self.theta.tolist())
0631 java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel(
0632 java_labels, java_pi, java_theta)
0633 java_model.save(sc._jsc.sc(), path)
0634
0635 @classmethod
0636 @since('1.4.0')
0637 def load(cls, sc, path):
0638 """
0639 Load a model from the given path.
0640 """
0641 java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel.load(
0642 sc._jsc.sc(), path)
0643
0644 py_labels = _java2py(sc, java_model.labels(), "latin1")
0645 py_pi = _java2py(sc, java_model.pi(), "latin1")
0646 py_theta = _java2py(sc, java_model.theta(), "latin1")
0647 return NaiveBayesModel(py_labels, py_pi, numpy.array(py_theta))
0648
0649
0650 class NaiveBayes(object):
0651 """
0652 .. versionadded:: 0.9.0
0653 """
0654
0655 @classmethod
0656 @since('0.9.0')
0657 def train(cls, data, lambda_=1.0):
0658 """
0659 Train a Naive Bayes model given an RDD of (label, features)
0660 vectors.
0661
0662 This is the `Multinomial NB <http://tinyurl.com/lsdw6p>`_ which
0663 can handle all kinds of discrete data. For example, by
0664 converting documents into TF-IDF vectors, it can be used for
0665 document classification. By making every vector a 0-1 vector,
0666 it can also be used as `Bernoulli NB <http://tinyurl.com/p7c96j6>`_.
0667 The input feature values must be nonnegative.
0668
0669 :param data:
0670 RDD of LabeledPoint.
0671 :param lambda_:
0672 The smoothing parameter.
0673 (default: 1.0)
0674 """
0675 first = data.first()
0676 if not isinstance(first, LabeledPoint):
0677 raise ValueError("`data` should be an RDD of LabeledPoint")
0678 labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
0679 return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
0680
0681
0682 @inherit_doc
0683 class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
0684 """
0685 Train or predict a logistic regression model on streaming data.
0686 Training uses Stochastic Gradient Descent to update the model based on
0687 each new batch of incoming data from a DStream.
0688
0689 Each batch of data is assumed to be an RDD of LabeledPoints.
0690 The number of data points per batch can vary, but the number
0691 of features must be constant. An initial weight
0692 vector must be provided.
0693
0694 :param stepSize:
0695 Step size for each iteration of gradient descent.
0696 (default: 0.1)
0697 :param numIterations:
0698 Number of iterations run for each batch of data.
0699 (default: 50)
0700 :param miniBatchFraction:
0701 Fraction of each batch of data to use for updates.
0702 (default: 1.0)
0703 :param regParam:
0704 L2 Regularization parameter.
0705 (default: 0.0)
0706 :param convergenceTol:
0707 Value used to determine when to terminate iterations.
0708 (default: 0.001)
0709
0710 .. versionadded:: 1.5.0
0711 """
0712 def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.0,
0713 convergenceTol=0.001):
0714 self.stepSize = stepSize
0715 self.numIterations = numIterations
0716 self.regParam = regParam
0717 self.miniBatchFraction = miniBatchFraction
0718 self.convergenceTol = convergenceTol
0719 self._model = None
0720 super(StreamingLogisticRegressionWithSGD, self).__init__(
0721 model=self._model)
0722
0723 @since('1.5.0')
0724 def setInitialWeights(self, initialWeights):
0725 """
0726 Set the initial value of weights.
0727
0728 This must be set before running trainOn and predictOn.
0729 """
0730 initialWeights = _convert_to_vector(initialWeights)
0731
0732
0733 self._model = LogisticRegressionModel(
0734 initialWeights, 0, initialWeights.size, 2)
0735 return self
0736
0737 @since('1.5.0')
0738 def trainOn(self, dstream):
0739 """Train the model on the incoming dstream."""
0740 self._validate(dstream)
0741
0742 def update(rdd):
0743
0744 if not rdd.isEmpty():
0745 self._model = LogisticRegressionWithSGD.train(
0746 rdd, self.numIterations, self.stepSize,
0747 self.miniBatchFraction, self._model.weights,
0748 regParam=self.regParam, convergenceTol=self.convergenceTol)
0749
0750 dstream.foreachRDD(update)
0751
0752
0753 def _test():
0754 import doctest
0755 from pyspark.sql import SparkSession
0756 import pyspark.mllib.classification
0757 globs = pyspark.mllib.classification.__dict__.copy()
0758 spark = SparkSession.builder\
0759 .master("local[4]")\
0760 .appName("mllib.classification tests")\
0761 .getOrCreate()
0762 globs['sc'] = spark.sparkContext
0763 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0764 spark.stop()
0765 if failure_count:
0766 sys.exit(-1)
0767
0768 if __name__ == "__main__":
0769 _test()