0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019 if sys.version >= '3':
0020 basestring = str
0021
0022 from pyspark.rdd import RDD, ignore_unicode_prefix
0023 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
0024 from pyspark.mllib.linalg import Matrix, _convert_to_vector
0025 from pyspark.mllib.regression import LabeledPoint
0026 from pyspark.mllib.stat.test import ChiSqTestResult, KolmogorovSmirnovTestResult
0027
0028
0029 __all__ = ['MultivariateStatisticalSummary', 'Statistics']
0030
0031
0032 class MultivariateStatisticalSummary(JavaModelWrapper):
0033
0034 """
0035 Trait for multivariate statistical summary of a data matrix.
0036 """
0037
0038 def mean(self):
0039 return self.call("mean").toArray()
0040
0041 def variance(self):
0042 return self.call("variance").toArray()
0043
0044 def count(self):
0045 return int(self.call("count"))
0046
0047 def numNonzeros(self):
0048 return self.call("numNonzeros").toArray()
0049
0050 def max(self):
0051 return self.call("max").toArray()
0052
0053 def min(self):
0054 return self.call("min").toArray()
0055
0056 def normL1(self):
0057 return self.call("normL1").toArray()
0058
0059 def normL2(self):
0060 return self.call("normL2").toArray()
0061
0062
0063 class Statistics(object):
0064
0065 @staticmethod
0066 def colStats(rdd):
0067 """
0068 Computes column-wise summary statistics for the input RDD[Vector].
0069
0070 :param rdd: an RDD[Vector] for which column-wise summary statistics
0071 are to be computed.
0072 :return: :class:`MultivariateStatisticalSummary` object containing
0073 column-wise summary statistics.
0074
0075 >>> from pyspark.mllib.linalg import Vectors
0076 >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
0077 ... Vectors.dense([4, 5, 0, 3]),
0078 ... Vectors.dense([6, 7, 0, 8])])
0079 >>> cStats = Statistics.colStats(rdd)
0080 >>> cStats.mean()
0081 array([ 4., 4., 0., 3.])
0082 >>> cStats.variance()
0083 array([ 4., 13., 0., 25.])
0084 >>> cStats.count()
0085 3
0086 >>> cStats.numNonzeros()
0087 array([ 3., 2., 0., 3.])
0088 >>> cStats.max()
0089 array([ 6., 7., 0., 8.])
0090 >>> cStats.min()
0091 array([ 2., 0., 0., -2.])
0092 """
0093 cStats = callMLlibFunc("colStats", rdd.map(_convert_to_vector))
0094 return MultivariateStatisticalSummary(cStats)
0095
0096 @staticmethod
0097 def corr(x, y=None, method=None):
0098 """
0099 Compute the correlation (matrix) for the input RDD(s) using the
0100 specified method.
0101 Methods currently supported: `pearson (default), spearman`.
0102
0103 If a single RDD of Vectors is passed in, a correlation matrix
0104 comparing the columns in the input RDD is returned. Use `method`
0105 to specify the method to be used for single RDD inout.
0106 If two RDDs of floats are passed in, a single float is returned.
0107
0108 :param x: an RDD of vector for which the correlation matrix is to be computed,
0109 or an RDD of float of the same cardinality as y when y is specified.
0110 :param y: an RDD of float of the same cardinality as x.
0111 :param method: String specifying the method to use for computing correlation.
0112 Supported: `pearson` (default), `spearman`
0113 :return: Correlation matrix comparing columns in x.
0114
0115 >>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
0116 >>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
0117 >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
0118 >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
0119 True
0120 >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
0121 True
0122 >>> Statistics.corr(x, y, "spearman")
0123 0.5
0124 >>> from math import isnan
0125 >>> isnan(Statistics.corr(x, zeros))
0126 True
0127 >>> from pyspark.mllib.linalg import Vectors
0128 >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
0129 ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
0130 >>> pearsonCorr = Statistics.corr(rdd)
0131 >>> print(str(pearsonCorr).replace('nan', 'NaN'))
0132 [[ 1. 0.05564149 NaN 0.40047142]
0133 [ 0.05564149 1. NaN 0.91359586]
0134 [ NaN NaN 1. NaN]
0135 [ 0.40047142 0.91359586 NaN 1. ]]
0136 >>> spearmanCorr = Statistics.corr(rdd, method="spearman")
0137 >>> print(str(spearmanCorr).replace('nan', 'NaN'))
0138 [[ 1. 0.10540926 NaN 0.4 ]
0139 [ 0.10540926 1. NaN 0.9486833 ]
0140 [ NaN NaN 1. NaN]
0141 [ 0.4 0.9486833 NaN 1. ]]
0142 >>> try:
0143 ... Statistics.corr(rdd, "spearman")
0144 ... print("Method name as second argument without 'method=' shouldn't be allowed.")
0145 ... except TypeError:
0146 ... pass
0147 """
0148
0149
0150
0151 if type(y) == str:
0152 raise TypeError("Use 'method=' to specify method name.")
0153
0154 if not y:
0155 return callMLlibFunc("corr", x.map(_convert_to_vector), method).toArray()
0156 else:
0157 return callMLlibFunc("corr", x.map(float), y.map(float), method)
0158
0159 @staticmethod
0160 @ignore_unicode_prefix
0161 def chiSqTest(observed, expected=None):
0162 """
0163 If `observed` is Vector, conduct Pearson's chi-squared goodness
0164 of fit test of the observed data against the expected distribution,
0165 or againt the uniform distribution (by default), with each category
0166 having an expected frequency of `1 / len(observed)`.
0167
0168 If `observed` is matrix, conduct Pearson's independence test on the
0169 input contingency matrix, which cannot contain negative entries or
0170 columns or rows that sum up to 0.
0171
0172 If `observed` is an RDD of LabeledPoint, conduct Pearson's independence
0173 test for every feature against the label across the input RDD.
0174 For each feature, the (feature, label) pairs are converted into a
0175 contingency matrix for which the chi-squared statistic is computed.
0176 All label and feature values must be categorical.
0177
0178 .. note:: `observed` cannot contain negative values
0179
0180 :param observed: it could be a vector containing the observed categorical
0181 counts/relative frequencies, or the contingency matrix
0182 (containing either counts or relative frequencies),
0183 or an RDD of LabeledPoint containing the labeled dataset
0184 with categorical features. Real-valued features will be
0185 treated as categorical for each distinct value.
0186 :param expected: Vector containing the expected categorical counts/relative
0187 frequencies. `expected` is rescaled if the `expected` sum
0188 differs from the `observed` sum.
0189 :return: ChiSquaredTest object containing the test statistic, degrees
0190 of freedom, p-value, the method used, and the null hypothesis.
0191
0192 >>> from pyspark.mllib.linalg import Vectors, Matrices
0193 >>> observed = Vectors.dense([4, 6, 5])
0194 >>> pearson = Statistics.chiSqTest(observed)
0195 >>> print(pearson.statistic)
0196 0.4
0197 >>> pearson.degreesOfFreedom
0198 2
0199 >>> print(round(pearson.pValue, 4))
0200 0.8187
0201 >>> pearson.method
0202 u'pearson'
0203 >>> pearson.nullHypothesis
0204 u'observed follows the same distribution as expected.'
0205
0206 >>> observed = Vectors.dense([21, 38, 43, 80])
0207 >>> expected = Vectors.dense([3, 5, 7, 20])
0208 >>> pearson = Statistics.chiSqTest(observed, expected)
0209 >>> print(round(pearson.pValue, 4))
0210 0.0027
0211
0212 >>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
0213 >>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
0214 >>> print(round(chi.statistic, 4))
0215 21.9958
0216
0217 >>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
0218 ... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
0219 ... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
0220 ... LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
0221 ... LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
0222 ... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
0223 >>> rdd = sc.parallelize(data, 4)
0224 >>> chi = Statistics.chiSqTest(rdd)
0225 >>> print(chi[0].statistic)
0226 0.75
0227 >>> print(chi[1].statistic)
0228 1.5
0229 """
0230 if isinstance(observed, RDD):
0231 if not isinstance(observed.first(), LabeledPoint):
0232 raise ValueError("observed should be an RDD of LabeledPoint")
0233 jmodels = callMLlibFunc("chiSqTest", observed)
0234 return [ChiSqTestResult(m) for m in jmodels]
0235
0236 if isinstance(observed, Matrix):
0237 jmodel = callMLlibFunc("chiSqTest", observed)
0238 else:
0239 if expected and len(expected) != len(observed):
0240 raise ValueError("`expected` should have same length with `observed`")
0241 jmodel = callMLlibFunc("chiSqTest", _convert_to_vector(observed), expected)
0242 return ChiSqTestResult(jmodel)
0243
0244 @staticmethod
0245 @ignore_unicode_prefix
0246 def kolmogorovSmirnovTest(data, distName="norm", *params):
0247 """
0248 Performs the Kolmogorov-Smirnov (KS) test for data sampled from
0249 a continuous distribution. It tests the null hypothesis that
0250 the data is generated from a particular distribution.
0251
0252 The given data is sorted and the Empirical Cumulative
0253 Distribution Function (ECDF) is calculated
0254 which for a given point is the number of points having a CDF
0255 value lesser than it divided by the total number of points.
0256
0257 Since the data is sorted, this is a step function
0258 that rises by (1 / length of data) for every ordered point.
0259
0260 The KS statistic gives us the maximum distance between the
0261 ECDF and the CDF. Intuitively if this statistic is large, the
0262 probability that the null hypothesis is true becomes small.
0263 For specific details of the implementation, please have a look
0264 at the Scala documentation.
0265
0266 :param data: RDD, samples from the data
0267 :param distName: string, currently only "norm" is supported.
0268 (Normal distribution) to calculate the
0269 theoretical distribution of the data.
0270 :param params: additional values which need to be provided for
0271 a certain distribution.
0272 If not provided, the default values are used.
0273 :return: KolmogorovSmirnovTestResult object containing the test
0274 statistic, degrees of freedom, p-value,
0275 the method used, and the null hypothesis.
0276
0277 >>> kstest = Statistics.kolmogorovSmirnovTest
0278 >>> data = sc.parallelize([-1.0, 0.0, 1.0])
0279 >>> ksmodel = kstest(data, "norm")
0280 >>> print(round(ksmodel.pValue, 3))
0281 1.0
0282 >>> print(round(ksmodel.statistic, 3))
0283 0.175
0284 >>> ksmodel.nullHypothesis
0285 u'Sample follows theoretical distribution'
0286
0287 >>> data = sc.parallelize([2.0, 3.0, 4.0])
0288 >>> ksmodel = kstest(data, "norm", 3.0, 1.0)
0289 >>> print(round(ksmodel.pValue, 3))
0290 1.0
0291 >>> print(round(ksmodel.statistic, 3))
0292 0.175
0293 """
0294 if not isinstance(data, RDD):
0295 raise TypeError("data should be an RDD, got %s." % type(data))
0296 if not isinstance(distName, basestring):
0297 raise TypeError("distName should be a string, got %s." % type(distName))
0298
0299 params = [float(param) for param in params]
0300 return KolmogorovSmirnovTestResult(
0301 callMLlibFunc("kolmogorovSmirnovTest", data, distName, params))
0302
0303
0304 def _test():
0305 import doctest
0306 import numpy
0307 from pyspark.sql import SparkSession
0308 try:
0309
0310 numpy.set_printoptions(legacy='1.13')
0311 except TypeError:
0312 pass
0313 globs = globals().copy()
0314 spark = SparkSession.builder\
0315 .master("local[4]")\
0316 .appName("mllib.stat.statistics tests")\
0317 .getOrCreate()
0318 globs['sc'] = spark.sparkContext
0319 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0320 spark.stop()
0321 if failure_count:
0322 sys.exit(-1)
0323
0324
0325 if __name__ == "__main__":
0326 _test()