Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 from math import exp
0019 import sys
0020 import warnings
0021 
0022 import numpy
0023 
0024 from pyspark import RDD, since
0025 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
0026 from pyspark.mllib.linalg import SparseVector, _convert_to_vector
0027 from pyspark.mllib.regression import (
0028     LabeledPoint, LinearModel, _regression_train_wrapper,
0029     StreamingLinearAlgorithm)
0030 from pyspark.mllib.util import Saveable, Loader, inherit_doc
0031 
0032 
0033 __all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'LogisticRegressionWithLBFGS',
0034            'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
0035            'StreamingLogisticRegressionWithSGD']
0036 
0037 
0038 class LinearClassificationModel(LinearModel):
0039     """
0040     A private abstract class representing a multiclass classification
0041     model. The categories are represented by int values: 0, 1, 2, etc.
0042     """
0043     def __init__(self, weights, intercept):
0044         super(LinearClassificationModel, self).__init__(weights, intercept)
0045         self._threshold = None
0046 
0047     @since('1.4.0')
0048     def setThreshold(self, value):
0049         """
0050         Sets the threshold that separates positive predictions from
0051         negative predictions. An example with prediction score greater
0052         than or equal to this threshold is identified as a positive,
0053         and negative otherwise. It is used for binary classification
0054         only.
0055         """
0056         self._threshold = value
0057 
0058     @property
0059     @since('1.4.0')
0060     def threshold(self):
0061         """
0062         Returns the threshold (if any) used for converting raw
0063         prediction scores into 0/1 predictions. It is used for
0064         binary classification only.
0065         """
0066         return self._threshold
0067 
0068     @since('1.4.0')
0069     def clearThreshold(self):
0070         """
0071         Clears the threshold so that `predict` will output raw
0072         prediction scores. It is used for binary classification only.
0073         """
0074         self._threshold = None
0075 
0076     @since('1.4.0')
0077     def predict(self, test):
0078         """
0079         Predict values for a single data point or an RDD of points
0080         using the model trained.
0081         """
0082         raise NotImplementedError
0083 
0084 
0085 class LogisticRegressionModel(LinearClassificationModel):
0086 
0087     """
0088     Classification model trained using Multinomial/Binary Logistic
0089     Regression.
0090 
0091     :param weights:
0092       Weights computed for every feature.
0093     :param intercept:
0094       Intercept computed for this model. (Only used in Binary Logistic
0095       Regression. In Multinomial Logistic Regression, the intercepts will
0096       not bea single value, so the intercepts will be part of the
0097       weights.)
0098     :param numFeatures:
0099       The dimension of the features.
0100     :param numClasses:
0101       The number of possible outcomes for k classes classification problem
0102       in Multinomial Logistic Regression. By default, it is binary
0103       logistic regression so numClasses will be set to 2.
0104 
0105     >>> data = [
0106     ...     LabeledPoint(0.0, [0.0, 1.0]),
0107     ...     LabeledPoint(1.0, [1.0, 0.0]),
0108     ... ]
0109     >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)
0110     >>> lrm.predict([1.0, 0.0])
0111     1
0112     >>> lrm.predict([0.0, 1.0])
0113     0
0114     >>> lrm.predict(sc.parallelize([[1.0, 0.0], [0.0, 1.0]])).collect()
0115     [1, 0]
0116     >>> lrm.clearThreshold()
0117     >>> lrm.predict([0.0, 1.0])
0118     0.279...
0119 
0120     >>> sparse_data = [
0121     ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
0122     ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
0123     ...     LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
0124     ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
0125     ... ]
0126     >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10)
0127     >>> lrm.predict(numpy.array([0.0, 1.0]))
0128     1
0129     >>> lrm.predict(numpy.array([1.0, 0.0]))
0130     0
0131     >>> lrm.predict(SparseVector(2, {1: 1.0}))
0132     1
0133     >>> lrm.predict(SparseVector(2, {0: 1.0}))
0134     0
0135     >>> import os, tempfile
0136     >>> path = tempfile.mkdtemp()
0137     >>> lrm.save(sc, path)
0138     >>> sameModel = LogisticRegressionModel.load(sc, path)
0139     >>> sameModel.predict(numpy.array([0.0, 1.0]))
0140     1
0141     >>> sameModel.predict(SparseVector(2, {0: 1.0}))
0142     0
0143     >>> from shutil import rmtree
0144     >>> try:
0145     ...    rmtree(path)
0146     ... except:
0147     ...    pass
0148     >>> multi_class_data = [
0149     ...     LabeledPoint(0.0, [0.0, 1.0, 0.0]),
0150     ...     LabeledPoint(1.0, [1.0, 0.0, 0.0]),
0151     ...     LabeledPoint(2.0, [0.0, 0.0, 1.0])
0152     ... ]
0153     >>> data = sc.parallelize(multi_class_data)
0154     >>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
0155     >>> mcm.predict([0.0, 0.5, 0.0])
0156     0
0157     >>> mcm.predict([0.8, 0.0, 0.0])
0158     1
0159     >>> mcm.predict([0.0, 0.0, 0.3])
0160     2
0161 
0162     .. versionadded:: 0.9.0
0163     """
0164     def __init__(self, weights, intercept, numFeatures, numClasses):
0165         super(LogisticRegressionModel, self).__init__(weights, intercept)
0166         self._numFeatures = int(numFeatures)
0167         self._numClasses = int(numClasses)
0168         self._threshold = 0.5
0169         if self._numClasses == 2:
0170             self._dataWithBiasSize = None
0171             self._weightsMatrix = None
0172         else:
0173             self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1)
0174             self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1,
0175                                                                 self._dataWithBiasSize)
0176 
0177     @property
0178     @since('1.4.0')
0179     def numFeatures(self):
0180         """
0181         Dimension of the features.
0182         """
0183         return self._numFeatures
0184 
0185     @property
0186     @since('1.4.0')
0187     def numClasses(self):
0188         """
0189         Number of possible outcomes for k classes classification problem
0190         in Multinomial Logistic Regression.
0191         """
0192         return self._numClasses
0193 
0194     @since('0.9.0')
0195     def predict(self, x):
0196         """
0197         Predict values for a single data point or an RDD of points
0198         using the model trained.
0199         """
0200         if isinstance(x, RDD):
0201             return x.map(lambda v: self.predict(v))
0202 
0203         x = _convert_to_vector(x)
0204         if self.numClasses == 2:
0205             margin = self.weights.dot(x) + self._intercept
0206             if margin > 0:
0207                 prob = 1 / (1 + exp(-margin))
0208             else:
0209                 exp_margin = exp(margin)
0210                 prob = exp_margin / (1 + exp_margin)
0211             if self._threshold is None:
0212                 return prob
0213             else:
0214                 return 1 if prob > self._threshold else 0
0215         else:
0216             best_class = 0
0217             max_margin = 0.0
0218             if x.size + 1 == self._dataWithBiasSize:
0219                 for i in range(0, self._numClasses - 1):
0220                     margin = x.dot(self._weightsMatrix[i][0:x.size]) + \
0221                         self._weightsMatrix[i][x.size]
0222                     if margin > max_margin:
0223                         max_margin = margin
0224                         best_class = i + 1
0225             else:
0226                 for i in range(0, self._numClasses - 1):
0227                     margin = x.dot(self._weightsMatrix[i])
0228                     if margin > max_margin:
0229                         max_margin = margin
0230                         best_class = i + 1
0231             return best_class
0232 
0233     @since('1.4.0')
0234     def save(self, sc, path):
0235         """
0236         Save this model to the given path.
0237         """
0238         java_model = sc._jvm.org.apache.spark.mllib.classification.LogisticRegressionModel(
0239             _py2java(sc, self._coeff), self.intercept, self.numFeatures, self.numClasses)
0240         java_model.save(sc._jsc.sc(), path)
0241 
0242     @classmethod
0243     @since('1.4.0')
0244     def load(cls, sc, path):
0245         """
0246         Load a model from the given path.
0247         """
0248         java_model = sc._jvm.org.apache.spark.mllib.classification.LogisticRegressionModel.load(
0249             sc._jsc.sc(), path)
0250         weights = _java2py(sc, java_model.weights())
0251         intercept = java_model.intercept()
0252         numFeatures = java_model.numFeatures()
0253         numClasses = java_model.numClasses()
0254         threshold = java_model.getThreshold().get()
0255         model = LogisticRegressionModel(weights, intercept, numFeatures, numClasses)
0256         model.setThreshold(threshold)
0257         return model
0258 
0259     def __repr__(self):
0260         return self._call_java("toString")
0261 
0262 
0263 class LogisticRegressionWithSGD(object):
0264     """
0265     .. versionadded:: 0.9.0
0266     .. note:: Deprecated in 2.0.0. Use ml.classification.LogisticRegression or
0267             LogisticRegressionWithLBFGS.
0268     """
0269     @classmethod
0270     @since('0.9.0')
0271     def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
0272               initialWeights=None, regParam=0.01, regType="l2", intercept=False,
0273               validateData=True, convergenceTol=0.001):
0274         """
0275         Train a logistic regression model on the given data.
0276 
0277         :param data:
0278           The training data, an RDD of LabeledPoint.
0279         :param iterations:
0280           The number of iterations.
0281           (default: 100)
0282         :param step:
0283           The step parameter used in SGD.
0284           (default: 1.0)
0285         :param miniBatchFraction:
0286           Fraction of data to be used for each SGD iteration.
0287           (default: 1.0)
0288         :param initialWeights:
0289           The initial weights.
0290           (default: None)
0291         :param regParam:
0292           The regularizer parameter.
0293           (default: 0.01)
0294         :param regType:
0295           The type of regularizer used for training our model.
0296           Supported values:
0297 
0298             - "l1" for using L1 regularization
0299             - "l2" for using L2 regularization (default)
0300             - None for no regularization
0301         :param intercept:
0302           Boolean parameter which indicates the use or not of the
0303           augmented representation for training data (i.e., whether bias
0304           features are activated or not).
0305           (default: False)
0306         :param validateData:
0307           Boolean parameter which indicates if the algorithm should
0308           validate data before training.
0309           (default: True)
0310         :param convergenceTol:
0311           A condition which decides iteration termination.
0312           (default: 0.001)
0313         """
0314         warnings.warn(
0315             "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "
0316             "LogisticRegressionWithLBFGS.", DeprecationWarning)
0317 
0318         def train(rdd, i):
0319             return callMLlibFunc("trainLogisticRegressionModelWithSGD", rdd, int(iterations),
0320                                  float(step), float(miniBatchFraction), i, float(regParam), regType,
0321                                  bool(intercept), bool(validateData), float(convergenceTol))
0322 
0323         return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
0324 
0325 
0326 class LogisticRegressionWithLBFGS(object):
0327     """
0328     .. versionadded:: 1.2.0
0329     """
0330     @classmethod
0331     @since('1.2.0')
0332     def train(cls, data, iterations=100, initialWeights=None, regParam=0.0, regType="l2",
0333               intercept=False, corrections=10, tolerance=1e-6, validateData=True, numClasses=2):
0334         """
0335         Train a logistic regression model on the given data.
0336 
0337         :param data:
0338           The training data, an RDD of LabeledPoint.
0339         :param iterations:
0340           The number of iterations.
0341           (default: 100)
0342         :param initialWeights:
0343           The initial weights.
0344           (default: None)
0345         :param regParam:
0346           The regularizer parameter.
0347           (default: 0.0)
0348         :param regType:
0349           The type of regularizer used for training our model.
0350           Supported values:
0351 
0352             - "l1" for using L1 regularization
0353             - "l2" for using L2 regularization (default)
0354             - None for no regularization
0355         :param intercept:
0356           Boolean parameter which indicates the use or not of the
0357           augmented representation for training data (i.e., whether bias
0358           features are activated or not).
0359           (default: False)
0360         :param corrections:
0361           The number of corrections used in the LBFGS update.
0362           If a known updater is used for binary classification,
0363           it calls the ml implementation and this parameter will
0364           have no effect. (default: 10)
0365         :param tolerance:
0366           The convergence tolerance of iterations for L-BFGS.
0367           (default: 1e-6)
0368         :param validateData:
0369           Boolean parameter which indicates if the algorithm should
0370           validate data before training.
0371           (default: True)
0372         :param numClasses:
0373           The number of classes (i.e., outcomes) a label can take in
0374           Multinomial Logistic Regression.
0375           (default: 2)
0376 
0377         >>> data = [
0378         ...     LabeledPoint(0.0, [0.0, 1.0]),
0379         ...     LabeledPoint(1.0, [1.0, 0.0]),
0380         ... ]
0381         >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), iterations=10)
0382         >>> lrm.predict([1.0, 0.0])
0383         1
0384         >>> lrm.predict([0.0, 1.0])
0385         0
0386         """
0387         def train(rdd, i):
0388             return callMLlibFunc("trainLogisticRegressionModelWithLBFGS", rdd, int(iterations), i,
0389                                  float(regParam), regType, bool(intercept), int(corrections),
0390                                  float(tolerance), bool(validateData), int(numClasses))
0391 
0392         if initialWeights is None:
0393             if numClasses == 2:
0394                 initialWeights = [0.0] * len(data.first().features)
0395             else:
0396                 if intercept:
0397                     initialWeights = [0.0] * (len(data.first().features) + 1) * (numClasses - 1)
0398                 else:
0399                     initialWeights = [0.0] * len(data.first().features) * (numClasses - 1)
0400         return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights)
0401 
0402 
0403 class SVMModel(LinearClassificationModel):
0404 
0405     """
0406     Model for Support Vector Machines (SVMs).
0407 
0408     :param weights:
0409       Weights computed for every feature.
0410     :param intercept:
0411       Intercept computed for this model.
0412 
0413     >>> data = [
0414     ...     LabeledPoint(0.0, [0.0]),
0415     ...     LabeledPoint(1.0, [1.0]),
0416     ...     LabeledPoint(1.0, [2.0]),
0417     ...     LabeledPoint(1.0, [3.0])
0418     ... ]
0419     >>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10)
0420     >>> svm.predict([1.0])
0421     1
0422     >>> svm.predict(sc.parallelize([[1.0]])).collect()
0423     [1]
0424     >>> svm.clearThreshold()
0425     >>> svm.predict(numpy.array([1.0]))
0426     1.44...
0427 
0428     >>> sparse_data = [
0429     ...     LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
0430     ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
0431     ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
0432     ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
0433     ... ]
0434     >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10)
0435     >>> svm.predict(SparseVector(2, {1: 1.0}))
0436     1
0437     >>> svm.predict(SparseVector(2, {0: -1.0}))
0438     0
0439     >>> import os, tempfile
0440     >>> path = tempfile.mkdtemp()
0441     >>> svm.save(sc, path)
0442     >>> sameModel = SVMModel.load(sc, path)
0443     >>> sameModel.predict(SparseVector(2, {1: 1.0}))
0444     1
0445     >>> sameModel.predict(SparseVector(2, {0: -1.0}))
0446     0
0447     >>> from shutil import rmtree
0448     >>> try:
0449     ...    rmtree(path)
0450     ... except:
0451     ...    pass
0452 
0453     .. versionadded:: 0.9.0
0454     """
0455     def __init__(self, weights, intercept):
0456         super(SVMModel, self).__init__(weights, intercept)
0457         self._threshold = 0.0
0458 
0459     @since('0.9.0')
0460     def predict(self, x):
0461         """
0462         Predict values for a single data point or an RDD of points
0463         using the model trained.
0464         """
0465         if isinstance(x, RDD):
0466             return x.map(lambda v: self.predict(v))
0467 
0468         x = _convert_to_vector(x)
0469         margin = self.weights.dot(x) + self.intercept
0470         if self._threshold is None:
0471             return margin
0472         else:
0473             return 1 if margin > self._threshold else 0
0474 
0475     @since('1.4.0')
0476     def save(self, sc, path):
0477         """
0478         Save this model to the given path.
0479         """
0480         java_model = sc._jvm.org.apache.spark.mllib.classification.SVMModel(
0481             _py2java(sc, self._coeff), self.intercept)
0482         java_model.save(sc._jsc.sc(), path)
0483 
0484     @classmethod
0485     @since('1.4.0')
0486     def load(cls, sc, path):
0487         """
0488         Load a model from the given path.
0489         """
0490         java_model = sc._jvm.org.apache.spark.mllib.classification.SVMModel.load(
0491             sc._jsc.sc(), path)
0492         weights = _java2py(sc, java_model.weights())
0493         intercept = java_model.intercept()
0494         threshold = java_model.getThreshold().get()
0495         model = SVMModel(weights, intercept)
0496         model.setThreshold(threshold)
0497         return model
0498 
0499 
0500 class SVMWithSGD(object):
0501     """
0502     .. versionadded:: 0.9.0
0503     """
0504 
0505     @classmethod
0506     @since('0.9.0')
0507     def train(cls, data, iterations=100, step=1.0, regParam=0.01,
0508               miniBatchFraction=1.0, initialWeights=None, regType="l2",
0509               intercept=False, validateData=True, convergenceTol=0.001):
0510         """
0511         Train a support vector machine on the given data.
0512 
0513         :param data:
0514           The training data, an RDD of LabeledPoint.
0515         :param iterations:
0516           The number of iterations.
0517           (default: 100)
0518         :param step:
0519           The step parameter used in SGD.
0520           (default: 1.0)
0521         :param regParam:
0522           The regularizer parameter.
0523           (default: 0.01)
0524         :param miniBatchFraction:
0525           Fraction of data to be used for each SGD iteration.
0526           (default: 1.0)
0527         :param initialWeights:
0528           The initial weights.
0529           (default: None)
0530         :param regType:
0531           The type of regularizer used for training our model.
0532           Allowed values:
0533 
0534             - "l1" for using L1 regularization
0535             - "l2" for using L2 regularization (default)
0536             - None for no regularization
0537         :param intercept:
0538           Boolean parameter which indicates the use or not of the
0539           augmented representation for training data (i.e. whether bias
0540           features are activated or not).
0541           (default: False)
0542         :param validateData:
0543           Boolean parameter which indicates if the algorithm should
0544           validate data before training.
0545           (default: True)
0546         :param convergenceTol:
0547           A condition which decides iteration termination.
0548           (default: 0.001)
0549         """
0550         def train(rdd, i):
0551             return callMLlibFunc("trainSVMModelWithSGD", rdd, int(iterations), float(step),
0552                                  float(regParam), float(miniBatchFraction), i, regType,
0553                                  bool(intercept), bool(validateData), float(convergenceTol))
0554 
0555         return _regression_train_wrapper(train, SVMModel, data, initialWeights)
0556 
0557 
0558 @inherit_doc
0559 class NaiveBayesModel(Saveable, Loader):
0560 
0561     """
0562     Model for Naive Bayes classifiers.
0563 
0564     :param labels:
0565       List of labels.
0566     :param pi:
0567       Log of class priors, whose dimension is C, number of labels.
0568     :param theta:
0569       Log of class conditional probabilities, whose dimension is C-by-D,
0570       where D is number of features.
0571 
0572     >>> data = [
0573     ...     LabeledPoint(0.0, [0.0, 0.0]),
0574     ...     LabeledPoint(0.0, [0.0, 1.0]),
0575     ...     LabeledPoint(1.0, [1.0, 0.0]),
0576     ... ]
0577     >>> model = NaiveBayes.train(sc.parallelize(data))
0578     >>> model.predict(numpy.array([0.0, 1.0]))
0579     0.0
0580     >>> model.predict(numpy.array([1.0, 0.0]))
0581     1.0
0582     >>> model.predict(sc.parallelize([[1.0, 0.0]])).collect()
0583     [1.0]
0584     >>> sparse_data = [
0585     ...     LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
0586     ...     LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
0587     ...     LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
0588     ... ]
0589     >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
0590     >>> model.predict(SparseVector(2, {1: 1.0}))
0591     0.0
0592     >>> model.predict(SparseVector(2, {0: 1.0}))
0593     1.0
0594     >>> import os, tempfile
0595     >>> path = tempfile.mkdtemp()
0596     >>> model.save(sc, path)
0597     >>> sameModel = NaiveBayesModel.load(sc, path)
0598     >>> sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
0599     True
0600     >>> from shutil import rmtree
0601     >>> try:
0602     ...     rmtree(path)
0603     ... except OSError:
0604     ...     pass
0605 
0606     .. versionadded:: 0.9.0
0607     """
0608     def __init__(self, labels, pi, theta):
0609         self.labels = labels
0610         self.pi = pi
0611         self.theta = theta
0612 
0613     @since('0.9.0')
0614     def predict(self, x):
0615         """
0616         Return the most likely class for a data vector
0617         or an RDD of vectors
0618         """
0619         if isinstance(x, RDD):
0620             return x.map(lambda v: self.predict(v))
0621         x = _convert_to_vector(x)
0622         return self.labels[numpy.argmax(self.pi + x.dot(self.theta.transpose()))]
0623 
0624     def save(self, sc, path):
0625         """
0626         Save this model to the given path.
0627         """
0628         java_labels = _py2java(sc, self.labels.tolist())
0629         java_pi = _py2java(sc, self.pi.tolist())
0630         java_theta = _py2java(sc, self.theta.tolist())
0631         java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel(
0632             java_labels, java_pi, java_theta)
0633         java_model.save(sc._jsc.sc(), path)
0634 
0635     @classmethod
0636     @since('1.4.0')
0637     def load(cls, sc, path):
0638         """
0639         Load a model from the given path.
0640         """
0641         java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel.load(
0642             sc._jsc.sc(), path)
0643         # Can not unpickle array.array from Pyrolite in Python3 with "bytes"
0644         py_labels = _java2py(sc, java_model.labels(), "latin1")
0645         py_pi = _java2py(sc, java_model.pi(), "latin1")
0646         py_theta = _java2py(sc, java_model.theta(), "latin1")
0647         return NaiveBayesModel(py_labels, py_pi, numpy.array(py_theta))
0648 
0649 
0650 class NaiveBayes(object):
0651     """
0652     .. versionadded:: 0.9.0
0653     """
0654 
0655     @classmethod
0656     @since('0.9.0')
0657     def train(cls, data, lambda_=1.0):
0658         """
0659         Train a Naive Bayes model given an RDD of (label, features)
0660         vectors.
0661 
0662         This is the `Multinomial NB <http://tinyurl.com/lsdw6p>`_ which
0663         can handle all kinds of discrete data.  For example, by
0664         converting documents into TF-IDF vectors, it can be used for
0665         document classification. By making every vector a 0-1 vector,
0666         it can also be used as `Bernoulli NB <http://tinyurl.com/p7c96j6>`_.
0667         The input feature values must be nonnegative.
0668 
0669         :param data:
0670           RDD of LabeledPoint.
0671         :param lambda_:
0672           The smoothing parameter.
0673           (default: 1.0)
0674         """
0675         first = data.first()
0676         if not isinstance(first, LabeledPoint):
0677             raise ValueError("`data` should be an RDD of LabeledPoint")
0678         labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
0679         return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
0680 
0681 
0682 @inherit_doc
0683 class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
0684     """
0685     Train or predict a logistic regression model on streaming data.
0686     Training uses Stochastic Gradient Descent to update the model based on
0687     each new batch of incoming data from a DStream.
0688 
0689     Each batch of data is assumed to be an RDD of LabeledPoints.
0690     The number of data points per batch can vary, but the number
0691     of features must be constant. An initial weight
0692     vector must be provided.
0693 
0694     :param stepSize:
0695       Step size for each iteration of gradient descent.
0696       (default: 0.1)
0697     :param numIterations:
0698       Number of iterations run for each batch of data.
0699       (default: 50)
0700     :param miniBatchFraction:
0701       Fraction of each batch of data to use for updates.
0702       (default: 1.0)
0703     :param regParam:
0704       L2 Regularization parameter.
0705       (default: 0.0)
0706     :param convergenceTol:
0707       Value used to determine when to terminate iterations.
0708       (default: 0.001)
0709 
0710     .. versionadded:: 1.5.0
0711     """
0712     def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, regParam=0.0,
0713                  convergenceTol=0.001):
0714         self.stepSize = stepSize
0715         self.numIterations = numIterations
0716         self.regParam = regParam
0717         self.miniBatchFraction = miniBatchFraction
0718         self.convergenceTol = convergenceTol
0719         self._model = None
0720         super(StreamingLogisticRegressionWithSGD, self).__init__(
0721             model=self._model)
0722 
0723     @since('1.5.0')
0724     def setInitialWeights(self, initialWeights):
0725         """
0726         Set the initial value of weights.
0727 
0728         This must be set before running trainOn and predictOn.
0729         """
0730         initialWeights = _convert_to_vector(initialWeights)
0731 
0732         # LogisticRegressionWithSGD does only binary classification.
0733         self._model = LogisticRegressionModel(
0734             initialWeights, 0, initialWeights.size, 2)
0735         return self
0736 
0737     @since('1.5.0')
0738     def trainOn(self, dstream):
0739         """Train the model on the incoming dstream."""
0740         self._validate(dstream)
0741 
0742         def update(rdd):
0743             # LogisticRegressionWithSGD.train raises an error for an empty RDD.
0744             if not rdd.isEmpty():
0745                 self._model = LogisticRegressionWithSGD.train(
0746                     rdd, self.numIterations, self.stepSize,
0747                     self.miniBatchFraction, self._model.weights,
0748                     regParam=self.regParam, convergenceTol=self.convergenceTol)
0749 
0750         dstream.foreachRDD(update)
0751 
0752 
0753 def _test():
0754     import doctest
0755     from pyspark.sql import SparkSession
0756     import pyspark.mllib.classification
0757     globs = pyspark.mllib.classification.__dict__.copy()
0758     spark = SparkSession.builder\
0759         .master("local[4]")\
0760         .appName("mllib.classification tests")\
0761         .getOrCreate()
0762     globs['sc'] = spark.sparkContext
0763     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0764     spark.stop()
0765     if failure_count:
0766         sys.exit(-1)
0767 
0768 if __name__ == "__main__":
0769     _test()