Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 
0020 from pyspark import since, SparkContext
0021 from pyspark.ml.common import _java2py, _py2java
0022 from pyspark.ml.linalg import DenseMatrix, Vectors
0023 from pyspark.ml.wrapper import JavaWrapper, _jvm
0024 from pyspark.sql.column import Column, _to_seq
0025 from pyspark.sql.functions import lit
0026 
0027 
0028 class ChiSquareTest(object):
0029     """
0030     Conduct Pearson's independence test for every feature against the label. For each feature,
0031     the (feature, label) pairs are converted into a contingency matrix for which the Chi-squared
0032     statistic is computed. All label and feature values must be categorical.
0033 
0034     The null hypothesis is that the occurrence of the outcomes is statistically independent.
0035 
0036     .. versionadded:: 2.2.0
0037 
0038     """
0039     @staticmethod
0040     @since("2.2.0")
0041     def test(dataset, featuresCol, labelCol):
0042         """
0043         Perform a Pearson's independence test using dataset.
0044 
0045         :param dataset:
0046           DataFrame of categorical labels and categorical features.
0047           Real-valued features will be treated as categorical for each distinct value.
0048         :param featuresCol:
0049           Name of features column in dataset, of type `Vector` (`VectorUDT`).
0050         :param labelCol:
0051           Name of label column in dataset, of any numerical type.
0052         :return:
0053           DataFrame containing the test result for every feature against the label.
0054           This DataFrame will contain a single Row with the following fields:
0055           - `pValues: Vector`
0056           - `degreesOfFreedom: Array[Int]`
0057           - `statistics: Vector`
0058           Each of these fields has one value per feature.
0059 
0060         >>> from pyspark.ml.linalg import Vectors
0061         >>> from pyspark.ml.stat import ChiSquareTest
0062         >>> dataset = [[0, Vectors.dense([0, 0, 1])],
0063         ...            [0, Vectors.dense([1, 0, 1])],
0064         ...            [1, Vectors.dense([2, 1, 1])],
0065         ...            [1, Vectors.dense([3, 1, 1])]]
0066         >>> dataset = spark.createDataFrame(dataset, ["label", "features"])
0067         >>> chiSqResult = ChiSquareTest.test(dataset, 'features', 'label')
0068         >>> chiSqResult.select("degreesOfFreedom").collect()[0]
0069         Row(degreesOfFreedom=[3, 1, 0])
0070         """
0071         sc = SparkContext._active_spark_context
0072         javaTestObj = _jvm().org.apache.spark.ml.stat.ChiSquareTest
0073         args = [_py2java(sc, arg) for arg in (dataset, featuresCol, labelCol)]
0074         return _java2py(sc, javaTestObj.test(*args))
0075 
0076 
0077 class Correlation(object):
0078     """
0079     Compute the correlation matrix for the input dataset of Vectors using the specified method.
0080     Methods currently supported: `pearson` (default), `spearman`.
0081 
0082     .. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column
0083       and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
0084       which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'`
0085       to avoid recomputing the common lineage.
0086 
0087     .. versionadded:: 2.2.0
0088 
0089     """
0090     @staticmethod
0091     @since("2.2.0")
0092     def corr(dataset, column, method="pearson"):
0093         """
0094         Compute the correlation matrix with specified method using dataset.
0095 
0096         :param dataset:
0097           A Dataset or a DataFrame.
0098         :param column:
0099           The name of the column of vectors for which the correlation coefficient needs
0100           to be computed. This must be a column of the dataset, and it must contain
0101           Vector objects.
0102         :param method:
0103           String specifying the method to use for computing correlation.
0104           Supported: `pearson` (default), `spearman`.
0105         :return:
0106           A DataFrame that contains the correlation matrix of the column of vectors. This
0107           DataFrame contains a single row and a single column of name
0108           '$METHODNAME($COLUMN)'.
0109 
0110         >>> from pyspark.ml.linalg import Vectors
0111         >>> from pyspark.ml.stat import Correlation
0112         >>> dataset = [[Vectors.dense([1, 0, 0, -2])],
0113         ...            [Vectors.dense([4, 5, 0, 3])],
0114         ...            [Vectors.dense([6, 7, 0, 8])],
0115         ...            [Vectors.dense([9, 0, 0, 1])]]
0116         >>> dataset = spark.createDataFrame(dataset, ['features'])
0117         >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0]
0118         >>> print(str(pearsonCorr).replace('nan', 'NaN'))
0119         DenseMatrix([[ 1.        ,  0.0556...,         NaN,  0.4004...],
0120                      [ 0.0556...,  1.        ,         NaN,  0.9135...],
0121                      [        NaN,         NaN,  1.        ,         NaN],
0122                      [ 0.4004...,  0.9135...,         NaN,  1.        ]])
0123         >>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0]
0124         >>> print(str(spearmanCorr).replace('nan', 'NaN'))
0125         DenseMatrix([[ 1.        ,  0.1054...,         NaN,  0.4       ],
0126                      [ 0.1054...,  1.        ,         NaN,  0.9486... ],
0127                      [        NaN,         NaN,  1.        ,         NaN],
0128                      [ 0.4       ,  0.9486... ,         NaN,  1.        ]])
0129         """
0130         sc = SparkContext._active_spark_context
0131         javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
0132         args = [_py2java(sc, arg) for arg in (dataset, column, method)]
0133         return _java2py(sc, javaCorrObj.corr(*args))
0134 
0135 
0136 class KolmogorovSmirnovTest(object):
0137     """
0138     Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous
0139     distribution.
0140 
0141     By comparing the largest difference between the empirical cumulative
0142     distribution of the sample data and the theoretical distribution we can provide a test for the
0143     the null hypothesis that the sample data comes from that theoretical distribution.
0144 
0145     .. versionadded:: 2.4.0
0146 
0147     """
0148     @staticmethod
0149     @since("2.4.0")
0150     def test(dataset, sampleCol, distName, *params):
0151         """
0152         Conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability distribution
0153         equality. Currently supports the normal distribution, taking as parameters the mean and
0154         standard deviation.
0155 
0156         :param dataset:
0157           a Dataset or a DataFrame containing the sample of data to test.
0158         :param sampleCol:
0159           Name of sample column in dataset, of any numerical type.
0160         :param distName:
0161           a `string` name for a theoretical distribution, currently only support "norm".
0162         :param params:
0163           a list of `Double` values specifying the parameters to be used for the theoretical
0164           distribution. For "norm" distribution, the parameters includes mean and variance.
0165         :return:
0166           A DataFrame that contains the Kolmogorov-Smirnov test result for the input sampled data.
0167           This DataFrame will contain a single Row with the following fields:
0168           - `pValue: Double`
0169           - `statistic: Double`
0170 
0171         >>> from pyspark.ml.stat import KolmogorovSmirnovTest
0172         >>> dataset = [[-1.0], [0.0], [1.0]]
0173         >>> dataset = spark.createDataFrame(dataset, ['sample'])
0174         >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).first()
0175         >>> round(ksResult.pValue, 3)
0176         1.0
0177         >>> round(ksResult.statistic, 3)
0178         0.175
0179         >>> dataset = [[2.0], [3.0], [4.0]]
0180         >>> dataset = spark.createDataFrame(dataset, ['sample'])
0181         >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).first()
0182         >>> round(ksResult.pValue, 3)
0183         1.0
0184         >>> round(ksResult.statistic, 3)
0185         0.175
0186         """
0187         sc = SparkContext._active_spark_context
0188         javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest
0189         dataset = _py2java(sc, dataset)
0190         params = [float(param) for param in params]
0191         return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName,
0192                                              _jvm().PythonUtils.toSeq(params)))
0193 
0194 
0195 class Summarizer(object):
0196     """
0197     Tools for vectorized statistics on MLlib Vectors.
0198     The methods in this package provide various statistics for Vectors contained inside DataFrames.
0199     This class lets users pick the statistics they would like to extract for a given column.
0200 
0201     >>> from pyspark.ml.stat import Summarizer
0202     >>> from pyspark.sql import Row
0203     >>> from pyspark.ml.linalg import Vectors
0204     >>> summarizer = Summarizer.metrics("mean", "count")
0205     >>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
0206     ...                      Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
0207     >>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)
0208     +-----------------------------------+
0209     |aggregate_metrics(features, weight)|
0210     +-----------------------------------+
0211     |[[1.0,1.0,1.0], 1]                 |
0212     +-----------------------------------+
0213     <BLANKLINE>
0214     >>> df.select(summarizer.summary(df.features)).show(truncate=False)
0215     +--------------------------------+
0216     |aggregate_metrics(features, 1.0)|
0217     +--------------------------------+
0218     |[[1.0,1.5,2.0], 2]              |
0219     +--------------------------------+
0220     <BLANKLINE>
0221     >>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
0222     +--------------+
0223     |mean(features)|
0224     +--------------+
0225     |[1.0,1.0,1.0] |
0226     +--------------+
0227     <BLANKLINE>
0228     >>> df.select(Summarizer.mean(df.features)).show(truncate=False)
0229     +--------------+
0230     |mean(features)|
0231     +--------------+
0232     |[1.0,1.5,2.0] |
0233     +--------------+
0234     <BLANKLINE>
0235 
0236     .. versionadded:: 2.4.0
0237 
0238     """
0239     @staticmethod
0240     @since("2.4.0")
0241     def mean(col, weightCol=None):
0242         """
0243         return a column of mean summary
0244         """
0245         return Summarizer._get_single_metric(col, weightCol, "mean")
0246 
0247     @staticmethod
0248     @since("3.0.0")
0249     def sum(col, weightCol=None):
0250         """
0251         return a column of sum summary
0252         """
0253         return Summarizer._get_single_metric(col, weightCol, "sum")
0254 
0255     @staticmethod
0256     @since("2.4.0")
0257     def variance(col, weightCol=None):
0258         """
0259         return a column of variance summary
0260         """
0261         return Summarizer._get_single_metric(col, weightCol, "variance")
0262 
0263     @staticmethod
0264     @since("3.0.0")
0265     def std(col, weightCol=None):
0266         """
0267         return a column of std summary
0268         """
0269         return Summarizer._get_single_metric(col, weightCol, "std")
0270 
0271     @staticmethod
0272     @since("2.4.0")
0273     def count(col, weightCol=None):
0274         """
0275         return a column of count summary
0276         """
0277         return Summarizer._get_single_metric(col, weightCol, "count")
0278 
0279     @staticmethod
0280     @since("2.4.0")
0281     def numNonZeros(col, weightCol=None):
0282         """
0283         return a column of numNonZero summary
0284         """
0285         return Summarizer._get_single_metric(col, weightCol, "numNonZeros")
0286 
0287     @staticmethod
0288     @since("2.4.0")
0289     def max(col, weightCol=None):
0290         """
0291         return a column of max summary
0292         """
0293         return Summarizer._get_single_metric(col, weightCol, "max")
0294 
0295     @staticmethod
0296     @since("2.4.0")
0297     def min(col, weightCol=None):
0298         """
0299         return a column of min summary
0300         """
0301         return Summarizer._get_single_metric(col, weightCol, "min")
0302 
0303     @staticmethod
0304     @since("2.4.0")
0305     def normL1(col, weightCol=None):
0306         """
0307         return a column of normL1 summary
0308         """
0309         return Summarizer._get_single_metric(col, weightCol, "normL1")
0310 
0311     @staticmethod
0312     @since("2.4.0")
0313     def normL2(col, weightCol=None):
0314         """
0315         return a column of normL2 summary
0316         """
0317         return Summarizer._get_single_metric(col, weightCol, "normL2")
0318 
0319     @staticmethod
0320     def _check_param(featuresCol, weightCol):
0321         if weightCol is None:
0322             weightCol = lit(1.0)
0323         if not isinstance(featuresCol, Column) or not isinstance(weightCol, Column):
0324             raise TypeError("featureCol and weightCol should be a Column")
0325         return featuresCol, weightCol
0326 
0327     @staticmethod
0328     def _get_single_metric(col, weightCol, metric):
0329         col, weightCol = Summarizer._check_param(col, weightCol)
0330         return Column(JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer." + metric,
0331                                                 col._jc, weightCol._jc))
0332 
0333     @staticmethod
0334     @since("2.4.0")
0335     def metrics(*metrics):
0336         """
0337         Given a list of metrics, provides a builder that it turns computes metrics from a column.
0338 
0339         See the documentation of [[Summarizer]] for an example.
0340 
0341         The following metrics are accepted (case sensitive):
0342          - mean: a vector that contains the coefficient-wise mean.
0343          - sum: a vector that contains the coefficient-wise sum.
0344          - variance: a vector tha contains the coefficient-wise variance.
0345          - std: a vector tha contains the coefficient-wise standard deviation.
0346          - count: the count of all vectors seen.
0347          - numNonzeros: a vector with the number of non-zeros for each coefficients
0348          - max: the maximum for each coefficient.
0349          - min: the minimum for each coefficient.
0350          - normL2: the Euclidean norm for each coefficient.
0351          - normL1: the L1 norm of each coefficient (sum of the absolute values).
0352 
0353         :param metrics:
0354          metrics that can be provided.
0355         :return:
0356          an object of :py:class:`pyspark.ml.stat.SummaryBuilder`
0357 
0358         Note: Currently, the performance of this interface is about 2x~3x slower then using the RDD
0359         interface.
0360         """
0361         sc = SparkContext._active_spark_context
0362         js = JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer.metrics",
0363                                        _to_seq(sc, metrics))
0364         return SummaryBuilder(js)
0365 
0366 
0367 class SummaryBuilder(JavaWrapper):
0368     """
0369     A builder object that provides summary statistics about a given column.
0370 
0371     Users should not directly create such builders, but instead use one of the methods in
0372     :py:class:`pyspark.ml.stat.Summarizer`
0373 
0374     .. versionadded:: 2.4.0
0375 
0376     """
0377     def __init__(self, jSummaryBuilder):
0378         super(SummaryBuilder, self).__init__(jSummaryBuilder)
0379 
0380     @since("2.4.0")
0381     def summary(self, featuresCol, weightCol=None):
0382         """
0383         Returns an aggregate object that contains the summary of the column with the requested
0384         metrics.
0385 
0386         :param featuresCol:
0387          a column that contains features Vector object.
0388         :param weightCol:
0389          a column that contains weight value. Default weight is 1.0.
0390         :return:
0391          an aggregate column that contains the statistics. The exact content of this
0392          structure is determined during the creation of the builder.
0393         """
0394         featuresCol, weightCol = Summarizer._check_param(featuresCol, weightCol)
0395         return Column(self._java_obj.summary(featuresCol._jc, weightCol._jc))
0396 
0397 
0398 class MultivariateGaussian(object):
0399     """Represents a (mean, cov) tuple
0400 
0401     >>> m = MultivariateGaussian(Vectors.dense([11,12]), DenseMatrix(2, 2, (1.0, 3.0, 5.0, 2.0)))
0402     >>> (m.mean, m.cov.toArray())
0403     (DenseVector([11.0, 12.0]), array([[ 1.,  5.],
0404            [ 3.,  2.]]))
0405 
0406     .. versionadded:: 3.0.0
0407 
0408     """
0409     def __init__(self, mean, cov):
0410         self.mean = mean
0411         self.cov = cov
0412 
0413 
0414 if __name__ == "__main__":
0415     import doctest
0416     import numpy
0417     import pyspark.ml.stat
0418     from pyspark.sql import SparkSession
0419     try:
0420         # Numpy 1.14+ changed it's string format.
0421         numpy.set_printoptions(legacy='1.13')
0422     except TypeError:
0423         pass
0424 
0425     globs = pyspark.ml.stat.__dict__.copy()
0426     # The small batch size here ensures that we see multiple batches,
0427     # even in these small test examples:
0428     spark = SparkSession.builder \
0429         .master("local[2]") \
0430         .appName("ml.stat tests") \
0431         .getOrCreate()
0432     sc = spark.sparkContext
0433     globs['sc'] = sc
0434     globs['spark'] = spark
0435 
0436     failure_count, test_count = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0437     spark.stop()
0438     if failure_count:
0439         sys.exit(-1)