0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 Python package for random data generation.
0020 """
0021
0022 import sys
0023 from functools import wraps
0024
0025 from pyspark import since
0026 from pyspark.mllib.common import callMLlibFunc
0027
0028
0029 __all__ = ['RandomRDDs', ]
0030
0031
0032 def toArray(f):
0033 @wraps(f)
0034 def func(sc, *a, **kw):
0035 rdd = f(sc, *a, **kw)
0036 return rdd.map(lambda vec: vec.toArray())
0037 return func
0038
0039
0040 class RandomRDDs(object):
0041 """
0042 Generator methods for creating RDDs comprised of i.i.d samples from
0043 some distribution.
0044
0045 .. versionadded:: 1.1.0
0046 """
0047
0048 @staticmethod
0049 @since("1.1.0")
0050 def uniformRDD(sc, size, numPartitions=None, seed=None):
0051 """
0052 Generates an RDD comprised of i.i.d. samples from the
0053 uniform distribution U(0.0, 1.0).
0054
0055 To transform the distribution in the generated RDD from U(0.0, 1.0)
0056 to U(a, b), use
0057 ``RandomRDDs.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)``
0058
0059 :param sc: SparkContext used to create the RDD.
0060 :param size: Size of the RDD.
0061 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0062 :param seed: Random seed (default: a random long integer).
0063 :return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
0064
0065 >>> x = RandomRDDs.uniformRDD(sc, 100).collect()
0066 >>> len(x)
0067 100
0068 >>> max(x) <= 1.0 and min(x) >= 0.0
0069 True
0070 >>> RandomRDDs.uniformRDD(sc, 100, 4).getNumPartitions()
0071 4
0072 >>> parts = RandomRDDs.uniformRDD(sc, 100, seed=4).getNumPartitions()
0073 >>> parts == sc.defaultParallelism
0074 True
0075 """
0076 return callMLlibFunc("uniformRDD", sc._jsc, size, numPartitions, seed)
0077
0078 @staticmethod
0079 @since("1.1.0")
0080 def normalRDD(sc, size, numPartitions=None, seed=None):
0081 """
0082 Generates an RDD comprised of i.i.d. samples from the standard normal
0083 distribution.
0084
0085 To transform the distribution in the generated RDD from standard normal
0086 to some other normal N(mean, sigma^2), use
0087 ``RandomRDDs.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)``
0088
0089 :param sc: SparkContext used to create the RDD.
0090 :param size: Size of the RDD.
0091 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0092 :param seed: Random seed (default: a random long integer).
0093 :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
0094
0095 >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
0096 >>> stats = x.stats()
0097 >>> stats.count()
0098 1000
0099 >>> abs(stats.mean() - 0.0) < 0.1
0100 True
0101 >>> abs(stats.stdev() - 1.0) < 0.1
0102 True
0103 """
0104 return callMLlibFunc("normalRDD", sc._jsc, size, numPartitions, seed)
0105
0106 @staticmethod
0107 @since("1.3.0")
0108 def logNormalRDD(sc, mean, std, size, numPartitions=None, seed=None):
0109 """
0110 Generates an RDD comprised of i.i.d. samples from the log normal
0111 distribution with the input mean and standard distribution.
0112
0113 :param sc: SparkContext used to create the RDD.
0114 :param mean: mean for the log Normal distribution
0115 :param std: std for the log Normal distribution
0116 :param size: Size of the RDD.
0117 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0118 :param seed: Random seed (default: a random long integer).
0119 :return: RDD of float comprised of i.i.d. samples ~ log N(mean, std).
0120
0121 >>> from math import sqrt, exp
0122 >>> mean = 0.0
0123 >>> std = 1.0
0124 >>> expMean = exp(mean + 0.5 * std * std)
0125 >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
0126 >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
0127 >>> stats = x.stats()
0128 >>> stats.count()
0129 1000
0130 >>> abs(stats.mean() - expMean) < 0.5
0131 True
0132 >>> from math import sqrt
0133 >>> abs(stats.stdev() - expStd) < 0.5
0134 True
0135 """
0136 return callMLlibFunc("logNormalRDD", sc._jsc, float(mean), float(std),
0137 size, numPartitions, seed)
0138
0139 @staticmethod
0140 @since("1.1.0")
0141 def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
0142 """
0143 Generates an RDD comprised of i.i.d. samples from the Poisson
0144 distribution with the input mean.
0145
0146 :param sc: SparkContext used to create the RDD.
0147 :param mean: Mean, or lambda, for the Poisson distribution.
0148 :param size: Size of the RDD.
0149 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0150 :param seed: Random seed (default: a random long integer).
0151 :return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
0152
0153 >>> mean = 100.0
0154 >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
0155 >>> stats = x.stats()
0156 >>> stats.count()
0157 1000
0158 >>> abs(stats.mean() - mean) < 0.5
0159 True
0160 >>> from math import sqrt
0161 >>> abs(stats.stdev() - sqrt(mean)) < 0.5
0162 True
0163 """
0164 return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
0165
0166 @staticmethod
0167 @since("1.3.0")
0168 def exponentialRDD(sc, mean, size, numPartitions=None, seed=None):
0169 """
0170 Generates an RDD comprised of i.i.d. samples from the Exponential
0171 distribution with the input mean.
0172
0173 :param sc: SparkContext used to create the RDD.
0174 :param mean: Mean, or 1 / lambda, for the Exponential distribution.
0175 :param size: Size of the RDD.
0176 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0177 :param seed: Random seed (default: a random long integer).
0178 :return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
0179
0180 >>> mean = 2.0
0181 >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
0182 >>> stats = x.stats()
0183 >>> stats.count()
0184 1000
0185 >>> abs(stats.mean() - mean) < 0.5
0186 True
0187 >>> from math import sqrt
0188 >>> abs(stats.stdev() - sqrt(mean)) < 0.5
0189 True
0190 """
0191 return callMLlibFunc("exponentialRDD", sc._jsc, float(mean), size, numPartitions, seed)
0192
0193 @staticmethod
0194 @since("1.3.0")
0195 def gammaRDD(sc, shape, scale, size, numPartitions=None, seed=None):
0196 """
0197 Generates an RDD comprised of i.i.d. samples from the Gamma
0198 distribution with the input shape and scale.
0199
0200 :param sc: SparkContext used to create the RDD.
0201 :param shape: shape (> 0) parameter for the Gamma distribution
0202 :param scale: scale (> 0) parameter for the Gamma distribution
0203 :param size: Size of the RDD.
0204 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0205 :param seed: Random seed (default: a random long integer).
0206 :return: RDD of float comprised of i.i.d. samples ~ Gamma(shape, scale).
0207
0208 >>> from math import sqrt
0209 >>> shape = 1.0
0210 >>> scale = 2.0
0211 >>> expMean = shape * scale
0212 >>> expStd = sqrt(shape * scale * scale)
0213 >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
0214 >>> stats = x.stats()
0215 >>> stats.count()
0216 1000
0217 >>> abs(stats.mean() - expMean) < 0.5
0218 True
0219 >>> abs(stats.stdev() - expStd) < 0.5
0220 True
0221 """
0222 return callMLlibFunc("gammaRDD", sc._jsc, float(shape),
0223 float(scale), size, numPartitions, seed)
0224
0225 @staticmethod
0226 @toArray
0227 @since("1.1.0")
0228 def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
0229 """
0230 Generates an RDD comprised of vectors containing i.i.d. samples drawn
0231 from the uniform distribution U(0.0, 1.0).
0232
0233 :param sc: SparkContext used to create the RDD.
0234 :param numRows: Number of Vectors in the RDD.
0235 :param numCols: Number of elements in each Vector.
0236 :param numPartitions: Number of partitions in the RDD.
0237 :param seed: Seed for the RNG that generates the seed for the generator in each partition.
0238 :return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
0239
0240 >>> import numpy as np
0241 >>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
0242 >>> mat.shape
0243 (10, 10)
0244 >>> mat.max() <= 1.0 and mat.min() >= 0.0
0245 True
0246 >>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
0247 4
0248 """
0249 return callMLlibFunc("uniformVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
0250
0251 @staticmethod
0252 @toArray
0253 @since("1.1.0")
0254 def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
0255 """
0256 Generates an RDD comprised of vectors containing i.i.d. samples drawn
0257 from the standard normal distribution.
0258
0259 :param sc: SparkContext used to create the RDD.
0260 :param numRows: Number of Vectors in the RDD.
0261 :param numCols: Number of elements in each Vector.
0262 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0263 :param seed: Random seed (default: a random long integer).
0264 :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
0265
0266 >>> import numpy as np
0267 >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
0268 >>> mat.shape
0269 (100, 100)
0270 >>> abs(mat.mean() - 0.0) < 0.1
0271 True
0272 >>> abs(mat.std() - 1.0) < 0.1
0273 True
0274 """
0275 return callMLlibFunc("normalVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
0276
0277 @staticmethod
0278 @toArray
0279 @since("1.3.0")
0280 def logNormalVectorRDD(sc, mean, std, numRows, numCols, numPartitions=None, seed=None):
0281 """
0282 Generates an RDD comprised of vectors containing i.i.d. samples drawn
0283 from the log normal distribution.
0284
0285 :param sc: SparkContext used to create the RDD.
0286 :param mean: Mean of the log normal distribution
0287 :param std: Standard Deviation of the log normal distribution
0288 :param numRows: Number of Vectors in the RDD.
0289 :param numCols: Number of elements in each Vector.
0290 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0291 :param seed: Random seed (default: a random long integer).
0292 :return: RDD of Vector with vectors containing i.i.d. samples ~ log `N(mean, std)`.
0293
0294 >>> import numpy as np
0295 >>> from math import sqrt, exp
0296 >>> mean = 0.0
0297 >>> std = 1.0
0298 >>> expMean = exp(mean + 0.5 * std * std)
0299 >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
0300 >>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
0301 >>> mat = np.matrix(m)
0302 >>> mat.shape
0303 (100, 100)
0304 >>> abs(mat.mean() - expMean) < 0.1
0305 True
0306 >>> abs(mat.std() - expStd) < 0.1
0307 True
0308 """
0309 return callMLlibFunc("logNormalVectorRDD", sc._jsc, float(mean), float(std),
0310 numRows, numCols, numPartitions, seed)
0311
0312 @staticmethod
0313 @toArray
0314 @since("1.1.0")
0315 def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
0316 """
0317 Generates an RDD comprised of vectors containing i.i.d. samples drawn
0318 from the Poisson distribution with the input mean.
0319
0320 :param sc: SparkContext used to create the RDD.
0321 :param mean: Mean, or lambda, for the Poisson distribution.
0322 :param numRows: Number of Vectors in the RDD.
0323 :param numCols: Number of elements in each Vector.
0324 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
0325 :param seed: Random seed (default: a random long integer).
0326 :return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
0327
0328 >>> import numpy as np
0329 >>> mean = 100.0
0330 >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
0331 >>> mat = np.mat(rdd.collect())
0332 >>> mat.shape
0333 (100, 100)
0334 >>> abs(mat.mean() - mean) < 0.5
0335 True
0336 >>> from math import sqrt
0337 >>> abs(mat.std() - sqrt(mean)) < 0.5
0338 True
0339 """
0340 return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
0341 numPartitions, seed)
0342
0343 @staticmethod
0344 @toArray
0345 @since("1.3.0")
0346 def exponentialVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
0347 """
0348 Generates an RDD comprised of vectors containing i.i.d. samples drawn
0349 from the Exponential distribution with the input mean.
0350
0351 :param sc: SparkContext used to create the RDD.
0352 :param mean: Mean, or 1 / lambda, for the Exponential distribution.
0353 :param numRows: Number of Vectors in the RDD.
0354 :param numCols: Number of elements in each Vector.
0355 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
0356 :param seed: Random seed (default: a random long integer).
0357 :return: RDD of Vector with vectors containing i.i.d. samples ~ Exp(mean).
0358
0359 >>> import numpy as np
0360 >>> mean = 0.5
0361 >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
0362 >>> mat = np.mat(rdd.collect())
0363 >>> mat.shape
0364 (100, 100)
0365 >>> abs(mat.mean() - mean) < 0.5
0366 True
0367 >>> from math import sqrt
0368 >>> abs(mat.std() - sqrt(mean)) < 0.5
0369 True
0370 """
0371 return callMLlibFunc("exponentialVectorRDD", sc._jsc, float(mean), numRows, numCols,
0372 numPartitions, seed)
0373
0374 @staticmethod
0375 @toArray
0376 @since("1.3.0")
0377 def gammaVectorRDD(sc, shape, scale, numRows, numCols, numPartitions=None, seed=None):
0378 """
0379 Generates an RDD comprised of vectors containing i.i.d. samples drawn
0380 from the Gamma distribution.
0381
0382 :param sc: SparkContext used to create the RDD.
0383 :param shape: Shape (> 0) of the Gamma distribution
0384 :param scale: Scale (> 0) of the Gamma distribution
0385 :param numRows: Number of Vectors in the RDD.
0386 :param numCols: Number of elements in each Vector.
0387 :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0388 :param seed: Random seed (default: a random long integer).
0389 :return: RDD of Vector with vectors containing i.i.d. samples ~ Gamma(shape, scale).
0390
0391 >>> import numpy as np
0392 >>> from math import sqrt
0393 >>> shape = 1.0
0394 >>> scale = 2.0
0395 >>> expMean = shape * scale
0396 >>> expStd = sqrt(shape * scale * scale)
0397 >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
0398 >>> mat.shape
0399 (100, 100)
0400 >>> abs(mat.mean() - expMean) < 0.1
0401 True
0402 >>> abs(mat.std() - expStd) < 0.1
0403 True
0404 """
0405 return callMLlibFunc("gammaVectorRDD", sc._jsc, float(shape), float(scale),
0406 numRows, numCols, numPartitions, seed)
0407
0408
0409 def _test():
0410 import doctest
0411 from pyspark.sql import SparkSession
0412 globs = globals().copy()
0413
0414
0415 spark = SparkSession.builder\
0416 .master("local[2]")\
0417 .appName("mllib.random tests")\
0418 .getOrCreate()
0419 globs['sc'] = spark.sparkContext
0420 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0421 spark.stop()
0422 if failure_count:
0423 sys.exit(-1)
0424
0425
0426 if __name__ == "__main__":
0427 _test()