0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019
0020 from pyspark import since, SparkContext
0021 from pyspark.ml.common import _java2py, _py2java
0022 from pyspark.ml.linalg import DenseMatrix, Vectors
0023 from pyspark.ml.wrapper import JavaWrapper, _jvm
0024 from pyspark.sql.column import Column, _to_seq
0025 from pyspark.sql.functions import lit
0026
0027
0028 class ChiSquareTest(object):
0029 """
0030 Conduct Pearson's independence test for every feature against the label. For each feature,
0031 the (feature, label) pairs are converted into a contingency matrix for which the Chi-squared
0032 statistic is computed. All label and feature values must be categorical.
0033
0034 The null hypothesis is that the occurrence of the outcomes is statistically independent.
0035
0036 .. versionadded:: 2.2.0
0037
0038 """
0039 @staticmethod
0040 @since("2.2.0")
0041 def test(dataset, featuresCol, labelCol):
0042 """
0043 Perform a Pearson's independence test using dataset.
0044
0045 :param dataset:
0046 DataFrame of categorical labels and categorical features.
0047 Real-valued features will be treated as categorical for each distinct value.
0048 :param featuresCol:
0049 Name of features column in dataset, of type `Vector` (`VectorUDT`).
0050 :param labelCol:
0051 Name of label column in dataset, of any numerical type.
0052 :return:
0053 DataFrame containing the test result for every feature against the label.
0054 This DataFrame will contain a single Row with the following fields:
0055 - `pValues: Vector`
0056 - `degreesOfFreedom: Array[Int]`
0057 - `statistics: Vector`
0058 Each of these fields has one value per feature.
0059
0060 >>> from pyspark.ml.linalg import Vectors
0061 >>> from pyspark.ml.stat import ChiSquareTest
0062 >>> dataset = [[0, Vectors.dense([0, 0, 1])],
0063 ... [0, Vectors.dense([1, 0, 1])],
0064 ... [1, Vectors.dense([2, 1, 1])],
0065 ... [1, Vectors.dense([3, 1, 1])]]
0066 >>> dataset = spark.createDataFrame(dataset, ["label", "features"])
0067 >>> chiSqResult = ChiSquareTest.test(dataset, 'features', 'label')
0068 >>> chiSqResult.select("degreesOfFreedom").collect()[0]
0069 Row(degreesOfFreedom=[3, 1, 0])
0070 """
0071 sc = SparkContext._active_spark_context
0072 javaTestObj = _jvm().org.apache.spark.ml.stat.ChiSquareTest
0073 args = [_py2java(sc, arg) for arg in (dataset, featuresCol, labelCol)]
0074 return _java2py(sc, javaTestObj.test(*args))
0075
0076
0077 class Correlation(object):
0078 """
0079 Compute the correlation matrix for the input dataset of Vectors using the specified method.
0080 Methods currently supported: `pearson` (default), `spearman`.
0081
0082 .. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column
0083 and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
0084 which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'`
0085 to avoid recomputing the common lineage.
0086
0087 .. versionadded:: 2.2.0
0088
0089 """
0090 @staticmethod
0091 @since("2.2.0")
0092 def corr(dataset, column, method="pearson"):
0093 """
0094 Compute the correlation matrix with specified method using dataset.
0095
0096 :param dataset:
0097 A Dataset or a DataFrame.
0098 :param column:
0099 The name of the column of vectors for which the correlation coefficient needs
0100 to be computed. This must be a column of the dataset, and it must contain
0101 Vector objects.
0102 :param method:
0103 String specifying the method to use for computing correlation.
0104 Supported: `pearson` (default), `spearman`.
0105 :return:
0106 A DataFrame that contains the correlation matrix of the column of vectors. This
0107 DataFrame contains a single row and a single column of name
0108 '$METHODNAME($COLUMN)'.
0109
0110 >>> from pyspark.ml.linalg import Vectors
0111 >>> from pyspark.ml.stat import Correlation
0112 >>> dataset = [[Vectors.dense([1, 0, 0, -2])],
0113 ... [Vectors.dense([4, 5, 0, 3])],
0114 ... [Vectors.dense([6, 7, 0, 8])],
0115 ... [Vectors.dense([9, 0, 0, 1])]]
0116 >>> dataset = spark.createDataFrame(dataset, ['features'])
0117 >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0]
0118 >>> print(str(pearsonCorr).replace('nan', 'NaN'))
0119 DenseMatrix([[ 1. , 0.0556..., NaN, 0.4004...],
0120 [ 0.0556..., 1. , NaN, 0.9135...],
0121 [ NaN, NaN, 1. , NaN],
0122 [ 0.4004..., 0.9135..., NaN, 1. ]])
0123 >>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0]
0124 >>> print(str(spearmanCorr).replace('nan', 'NaN'))
0125 DenseMatrix([[ 1. , 0.1054..., NaN, 0.4 ],
0126 [ 0.1054..., 1. , NaN, 0.9486... ],
0127 [ NaN, NaN, 1. , NaN],
0128 [ 0.4 , 0.9486... , NaN, 1. ]])
0129 """
0130 sc = SparkContext._active_spark_context
0131 javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
0132 args = [_py2java(sc, arg) for arg in (dataset, column, method)]
0133 return _java2py(sc, javaCorrObj.corr(*args))
0134
0135
0136 class KolmogorovSmirnovTest(object):
0137 """
0138 Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous
0139 distribution.
0140
0141 By comparing the largest difference between the empirical cumulative
0142 distribution of the sample data and the theoretical distribution we can provide a test for the
0143 the null hypothesis that the sample data comes from that theoretical distribution.
0144
0145 .. versionadded:: 2.4.0
0146
0147 """
0148 @staticmethod
0149 @since("2.4.0")
0150 def test(dataset, sampleCol, distName, *params):
0151 """
0152 Conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability distribution
0153 equality. Currently supports the normal distribution, taking as parameters the mean and
0154 standard deviation.
0155
0156 :param dataset:
0157 a Dataset or a DataFrame containing the sample of data to test.
0158 :param sampleCol:
0159 Name of sample column in dataset, of any numerical type.
0160 :param distName:
0161 a `string` name for a theoretical distribution, currently only support "norm".
0162 :param params:
0163 a list of `Double` values specifying the parameters to be used for the theoretical
0164 distribution. For "norm" distribution, the parameters includes mean and variance.
0165 :return:
0166 A DataFrame that contains the Kolmogorov-Smirnov test result for the input sampled data.
0167 This DataFrame will contain a single Row with the following fields:
0168 - `pValue: Double`
0169 - `statistic: Double`
0170
0171 >>> from pyspark.ml.stat import KolmogorovSmirnovTest
0172 >>> dataset = [[-1.0], [0.0], [1.0]]
0173 >>> dataset = spark.createDataFrame(dataset, ['sample'])
0174 >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).first()
0175 >>> round(ksResult.pValue, 3)
0176 1.0
0177 >>> round(ksResult.statistic, 3)
0178 0.175
0179 >>> dataset = [[2.0], [3.0], [4.0]]
0180 >>> dataset = spark.createDataFrame(dataset, ['sample'])
0181 >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).first()
0182 >>> round(ksResult.pValue, 3)
0183 1.0
0184 >>> round(ksResult.statistic, 3)
0185 0.175
0186 """
0187 sc = SparkContext._active_spark_context
0188 javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest
0189 dataset = _py2java(sc, dataset)
0190 params = [float(param) for param in params]
0191 return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName,
0192 _jvm().PythonUtils.toSeq(params)))
0193
0194
0195 class Summarizer(object):
0196 """
0197 Tools for vectorized statistics on MLlib Vectors.
0198 The methods in this package provide various statistics for Vectors contained inside DataFrames.
0199 This class lets users pick the statistics they would like to extract for a given column.
0200
0201 >>> from pyspark.ml.stat import Summarizer
0202 >>> from pyspark.sql import Row
0203 >>> from pyspark.ml.linalg import Vectors
0204 >>> summarizer = Summarizer.metrics("mean", "count")
0205 >>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
0206 ... Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
0207 >>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)
0208 +-----------------------------------+
0209 |aggregate_metrics(features, weight)|
0210 +-----------------------------------+
0211 |[[1.0,1.0,1.0], 1] |
0212 +-----------------------------------+
0213 <BLANKLINE>
0214 >>> df.select(summarizer.summary(df.features)).show(truncate=False)
0215 +--------------------------------+
0216 |aggregate_metrics(features, 1.0)|
0217 +--------------------------------+
0218 |[[1.0,1.5,2.0], 2] |
0219 +--------------------------------+
0220 <BLANKLINE>
0221 >>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
0222 +--------------+
0223 |mean(features)|
0224 +--------------+
0225 |[1.0,1.0,1.0] |
0226 +--------------+
0227 <BLANKLINE>
0228 >>> df.select(Summarizer.mean(df.features)).show(truncate=False)
0229 +--------------+
0230 |mean(features)|
0231 +--------------+
0232 |[1.0,1.5,2.0] |
0233 +--------------+
0234 <BLANKLINE>
0235
0236 .. versionadded:: 2.4.0
0237
0238 """
0239 @staticmethod
0240 @since("2.4.0")
0241 def mean(col, weightCol=None):
0242 """
0243 return a column of mean summary
0244 """
0245 return Summarizer._get_single_metric(col, weightCol, "mean")
0246
0247 @staticmethod
0248 @since("3.0.0")
0249 def sum(col, weightCol=None):
0250 """
0251 return a column of sum summary
0252 """
0253 return Summarizer._get_single_metric(col, weightCol, "sum")
0254
0255 @staticmethod
0256 @since("2.4.0")
0257 def variance(col, weightCol=None):
0258 """
0259 return a column of variance summary
0260 """
0261 return Summarizer._get_single_metric(col, weightCol, "variance")
0262
0263 @staticmethod
0264 @since("3.0.0")
0265 def std(col, weightCol=None):
0266 """
0267 return a column of std summary
0268 """
0269 return Summarizer._get_single_metric(col, weightCol, "std")
0270
0271 @staticmethod
0272 @since("2.4.0")
0273 def count(col, weightCol=None):
0274 """
0275 return a column of count summary
0276 """
0277 return Summarizer._get_single_metric(col, weightCol, "count")
0278
0279 @staticmethod
0280 @since("2.4.0")
0281 def numNonZeros(col, weightCol=None):
0282 """
0283 return a column of numNonZero summary
0284 """
0285 return Summarizer._get_single_metric(col, weightCol, "numNonZeros")
0286
0287 @staticmethod
0288 @since("2.4.0")
0289 def max(col, weightCol=None):
0290 """
0291 return a column of max summary
0292 """
0293 return Summarizer._get_single_metric(col, weightCol, "max")
0294
0295 @staticmethod
0296 @since("2.4.0")
0297 def min(col, weightCol=None):
0298 """
0299 return a column of min summary
0300 """
0301 return Summarizer._get_single_metric(col, weightCol, "min")
0302
0303 @staticmethod
0304 @since("2.4.0")
0305 def normL1(col, weightCol=None):
0306 """
0307 return a column of normL1 summary
0308 """
0309 return Summarizer._get_single_metric(col, weightCol, "normL1")
0310
0311 @staticmethod
0312 @since("2.4.0")
0313 def normL2(col, weightCol=None):
0314 """
0315 return a column of normL2 summary
0316 """
0317 return Summarizer._get_single_metric(col, weightCol, "normL2")
0318
0319 @staticmethod
0320 def _check_param(featuresCol, weightCol):
0321 if weightCol is None:
0322 weightCol = lit(1.0)
0323 if not isinstance(featuresCol, Column) or not isinstance(weightCol, Column):
0324 raise TypeError("featureCol and weightCol should be a Column")
0325 return featuresCol, weightCol
0326
0327 @staticmethod
0328 def _get_single_metric(col, weightCol, metric):
0329 col, weightCol = Summarizer._check_param(col, weightCol)
0330 return Column(JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer." + metric,
0331 col._jc, weightCol._jc))
0332
0333 @staticmethod
0334 @since("2.4.0")
0335 def metrics(*metrics):
0336 """
0337 Given a list of metrics, provides a builder that it turns computes metrics from a column.
0338
0339 See the documentation of [[Summarizer]] for an example.
0340
0341 The following metrics are accepted (case sensitive):
0342 - mean: a vector that contains the coefficient-wise mean.
0343 - sum: a vector that contains the coefficient-wise sum.
0344 - variance: a vector tha contains the coefficient-wise variance.
0345 - std: a vector tha contains the coefficient-wise standard deviation.
0346 - count: the count of all vectors seen.
0347 - numNonzeros: a vector with the number of non-zeros for each coefficients
0348 - max: the maximum for each coefficient.
0349 - min: the minimum for each coefficient.
0350 - normL2: the Euclidean norm for each coefficient.
0351 - normL1: the L1 norm of each coefficient (sum of the absolute values).
0352
0353 :param metrics:
0354 metrics that can be provided.
0355 :return:
0356 an object of :py:class:`pyspark.ml.stat.SummaryBuilder`
0357
0358 Note: Currently, the performance of this interface is about 2x~3x slower then using the RDD
0359 interface.
0360 """
0361 sc = SparkContext._active_spark_context
0362 js = JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer.metrics",
0363 _to_seq(sc, metrics))
0364 return SummaryBuilder(js)
0365
0366
0367 class SummaryBuilder(JavaWrapper):
0368 """
0369 A builder object that provides summary statistics about a given column.
0370
0371 Users should not directly create such builders, but instead use one of the methods in
0372 :py:class:`pyspark.ml.stat.Summarizer`
0373
0374 .. versionadded:: 2.4.0
0375
0376 """
0377 def __init__(self, jSummaryBuilder):
0378 super(SummaryBuilder, self).__init__(jSummaryBuilder)
0379
0380 @since("2.4.0")
0381 def summary(self, featuresCol, weightCol=None):
0382 """
0383 Returns an aggregate object that contains the summary of the column with the requested
0384 metrics.
0385
0386 :param featuresCol:
0387 a column that contains features Vector object.
0388 :param weightCol:
0389 a column that contains weight value. Default weight is 1.0.
0390 :return:
0391 an aggregate column that contains the statistics. The exact content of this
0392 structure is determined during the creation of the builder.
0393 """
0394 featuresCol, weightCol = Summarizer._check_param(featuresCol, weightCol)
0395 return Column(self._java_obj.summary(featuresCol._jc, weightCol._jc))
0396
0397
0398 class MultivariateGaussian(object):
0399 """Represents a (mean, cov) tuple
0400
0401 >>> m = MultivariateGaussian(Vectors.dense([11,12]), DenseMatrix(2, 2, (1.0, 3.0, 5.0, 2.0)))
0402 >>> (m.mean, m.cov.toArray())
0403 (DenseVector([11.0, 12.0]), array([[ 1., 5.],
0404 [ 3., 2.]]))
0405
0406 .. versionadded:: 3.0.0
0407
0408 """
0409 def __init__(self, mean, cov):
0410 self.mean = mean
0411 self.cov = cov
0412
0413
0414 if __name__ == "__main__":
0415 import doctest
0416 import numpy
0417 import pyspark.ml.stat
0418 from pyspark.sql import SparkSession
0419 try:
0420
0421 numpy.set_printoptions(legacy='1.13')
0422 except TypeError:
0423 pass
0424
0425 globs = pyspark.ml.stat.__dict__.copy()
0426
0427
0428 spark = SparkSession.builder \
0429 .master("local[2]") \
0430 .appName("ml.stat tests") \
0431 .getOrCreate()
0432 sc = spark.sparkContext
0433 globs['sc'] = sc
0434 globs['spark'] = spark
0435
0436 failure_count, test_count = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0437 spark.stop()
0438 if failure_count:
0439 sys.exit(-1)