Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 Python package for random data generation.
0020 """
0021 
0022 import sys
0023 from functools import wraps
0024 
0025 from pyspark import since
0026 from pyspark.mllib.common import callMLlibFunc
0027 
0028 
0029 __all__ = ['RandomRDDs', ]
0030 
0031 
0032 def toArray(f):
0033     @wraps(f)
0034     def func(sc, *a, **kw):
0035         rdd = f(sc, *a, **kw)
0036         return rdd.map(lambda vec: vec.toArray())
0037     return func
0038 
0039 
0040 class RandomRDDs(object):
0041     """
0042     Generator methods for creating RDDs comprised of i.i.d samples from
0043     some distribution.
0044 
0045     .. versionadded:: 1.1.0
0046     """
0047 
0048     @staticmethod
0049     @since("1.1.0")
0050     def uniformRDD(sc, size, numPartitions=None, seed=None):
0051         """
0052         Generates an RDD comprised of i.i.d. samples from the
0053         uniform distribution U(0.0, 1.0).
0054 
0055         To transform the distribution in the generated RDD from U(0.0, 1.0)
0056         to U(a, b), use
0057         ``RandomRDDs.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)``
0058 
0059         :param sc: SparkContext used to create the RDD.
0060         :param size: Size of the RDD.
0061         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0062         :param seed: Random seed (default: a random long integer).
0063         :return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
0064 
0065         >>> x = RandomRDDs.uniformRDD(sc, 100).collect()
0066         >>> len(x)
0067         100
0068         >>> max(x) <= 1.0 and min(x) >= 0.0
0069         True
0070         >>> RandomRDDs.uniformRDD(sc, 100, 4).getNumPartitions()
0071         4
0072         >>> parts = RandomRDDs.uniformRDD(sc, 100, seed=4).getNumPartitions()
0073         >>> parts == sc.defaultParallelism
0074         True
0075         """
0076         return callMLlibFunc("uniformRDD", sc._jsc, size, numPartitions, seed)
0077 
0078     @staticmethod
0079     @since("1.1.0")
0080     def normalRDD(sc, size, numPartitions=None, seed=None):
0081         """
0082         Generates an RDD comprised of i.i.d. samples from the standard normal
0083         distribution.
0084 
0085         To transform the distribution in the generated RDD from standard normal
0086         to some other normal N(mean, sigma^2), use
0087         ``RandomRDDs.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)``
0088 
0089         :param sc: SparkContext used to create the RDD.
0090         :param size: Size of the RDD.
0091         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0092         :param seed: Random seed (default: a random long integer).
0093         :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
0094 
0095         >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
0096         >>> stats = x.stats()
0097         >>> stats.count()
0098         1000
0099         >>> abs(stats.mean() - 0.0) < 0.1
0100         True
0101         >>> abs(stats.stdev() - 1.0) < 0.1
0102         True
0103         """
0104         return callMLlibFunc("normalRDD", sc._jsc, size, numPartitions, seed)
0105 
0106     @staticmethod
0107     @since("1.3.0")
0108     def logNormalRDD(sc, mean, std, size, numPartitions=None, seed=None):
0109         """
0110         Generates an RDD comprised of i.i.d. samples from the log normal
0111         distribution with the input mean and standard distribution.
0112 
0113         :param sc: SparkContext used to create the RDD.
0114         :param mean: mean for the log Normal distribution
0115         :param std: std for the log Normal distribution
0116         :param size: Size of the RDD.
0117         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0118         :param seed: Random seed (default: a random long integer).
0119         :return: RDD of float comprised of i.i.d. samples ~ log N(mean, std).
0120 
0121         >>> from math import sqrt, exp
0122         >>> mean = 0.0
0123         >>> std = 1.0
0124         >>> expMean = exp(mean + 0.5 * std * std)
0125         >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
0126         >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
0127         >>> stats = x.stats()
0128         >>> stats.count()
0129         1000
0130         >>> abs(stats.mean() - expMean) < 0.5
0131         True
0132         >>> from math import sqrt
0133         >>> abs(stats.stdev() - expStd) < 0.5
0134         True
0135         """
0136         return callMLlibFunc("logNormalRDD", sc._jsc, float(mean), float(std),
0137                              size, numPartitions, seed)
0138 
0139     @staticmethod
0140     @since("1.1.0")
0141     def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
0142         """
0143         Generates an RDD comprised of i.i.d. samples from the Poisson
0144         distribution with the input mean.
0145 
0146         :param sc: SparkContext used to create the RDD.
0147         :param mean: Mean, or lambda, for the Poisson distribution.
0148         :param size: Size of the RDD.
0149         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0150         :param seed: Random seed (default: a random long integer).
0151         :return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
0152 
0153         >>> mean = 100.0
0154         >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
0155         >>> stats = x.stats()
0156         >>> stats.count()
0157         1000
0158         >>> abs(stats.mean() - mean) < 0.5
0159         True
0160         >>> from math import sqrt
0161         >>> abs(stats.stdev() - sqrt(mean)) < 0.5
0162         True
0163         """
0164         return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
0165 
0166     @staticmethod
0167     @since("1.3.0")
0168     def exponentialRDD(sc, mean, size, numPartitions=None, seed=None):
0169         """
0170         Generates an RDD comprised of i.i.d. samples from the Exponential
0171         distribution with the input mean.
0172 
0173         :param sc: SparkContext used to create the RDD.
0174         :param mean: Mean, or 1 / lambda, for the Exponential distribution.
0175         :param size: Size of the RDD.
0176         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0177         :param seed: Random seed (default: a random long integer).
0178         :return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
0179 
0180         >>> mean = 2.0
0181         >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
0182         >>> stats = x.stats()
0183         >>> stats.count()
0184         1000
0185         >>> abs(stats.mean() - mean) < 0.5
0186         True
0187         >>> from math import sqrt
0188         >>> abs(stats.stdev() - sqrt(mean)) < 0.5
0189         True
0190         """
0191         return callMLlibFunc("exponentialRDD", sc._jsc, float(mean), size, numPartitions, seed)
0192 
0193     @staticmethod
0194     @since("1.3.0")
0195     def gammaRDD(sc, shape, scale, size, numPartitions=None, seed=None):
0196         """
0197         Generates an RDD comprised of i.i.d. samples from the Gamma
0198         distribution with the input shape and scale.
0199 
0200         :param sc: SparkContext used to create the RDD.
0201         :param shape: shape (> 0) parameter for the Gamma distribution
0202         :param scale: scale (> 0) parameter for the Gamma distribution
0203         :param size: Size of the RDD.
0204         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0205         :param seed: Random seed (default: a random long integer).
0206         :return: RDD of float comprised of i.i.d. samples ~ Gamma(shape, scale).
0207 
0208         >>> from math import sqrt
0209         >>> shape = 1.0
0210         >>> scale = 2.0
0211         >>> expMean = shape * scale
0212         >>> expStd = sqrt(shape * scale * scale)
0213         >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
0214         >>> stats = x.stats()
0215         >>> stats.count()
0216         1000
0217         >>> abs(stats.mean() - expMean) < 0.5
0218         True
0219         >>> abs(stats.stdev() - expStd) < 0.5
0220         True
0221         """
0222         return callMLlibFunc("gammaRDD", sc._jsc, float(shape),
0223                              float(scale), size, numPartitions, seed)
0224 
0225     @staticmethod
0226     @toArray
0227     @since("1.1.0")
0228     def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
0229         """
0230         Generates an RDD comprised of vectors containing i.i.d. samples drawn
0231         from the uniform distribution U(0.0, 1.0).
0232 
0233         :param sc: SparkContext used to create the RDD.
0234         :param numRows: Number of Vectors in the RDD.
0235         :param numCols: Number of elements in each Vector.
0236         :param numPartitions: Number of partitions in the RDD.
0237         :param seed: Seed for the RNG that generates the seed for the generator in each partition.
0238         :return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
0239 
0240         >>> import numpy as np
0241         >>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
0242         >>> mat.shape
0243         (10, 10)
0244         >>> mat.max() <= 1.0 and mat.min() >= 0.0
0245         True
0246         >>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
0247         4
0248         """
0249         return callMLlibFunc("uniformVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
0250 
0251     @staticmethod
0252     @toArray
0253     @since("1.1.0")
0254     def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
0255         """
0256         Generates an RDD comprised of vectors containing i.i.d. samples drawn
0257         from the standard normal distribution.
0258 
0259         :param sc: SparkContext used to create the RDD.
0260         :param numRows: Number of Vectors in the RDD.
0261         :param numCols: Number of elements in each Vector.
0262         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0263         :param seed: Random seed (default: a random long integer).
0264         :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
0265 
0266         >>> import numpy as np
0267         >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
0268         >>> mat.shape
0269         (100, 100)
0270         >>> abs(mat.mean() - 0.0) < 0.1
0271         True
0272         >>> abs(mat.std() - 1.0) < 0.1
0273         True
0274         """
0275         return callMLlibFunc("normalVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
0276 
0277     @staticmethod
0278     @toArray
0279     @since("1.3.0")
0280     def logNormalVectorRDD(sc, mean, std, numRows, numCols, numPartitions=None, seed=None):
0281         """
0282         Generates an RDD comprised of vectors containing i.i.d. samples drawn
0283         from the log normal distribution.
0284 
0285         :param sc: SparkContext used to create the RDD.
0286         :param mean: Mean of the log normal distribution
0287         :param std: Standard Deviation of the log normal distribution
0288         :param numRows: Number of Vectors in the RDD.
0289         :param numCols: Number of elements in each Vector.
0290         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0291         :param seed: Random seed (default: a random long integer).
0292         :return: RDD of Vector with vectors containing i.i.d. samples ~ log `N(mean, std)`.
0293 
0294         >>> import numpy as np
0295         >>> from math import sqrt, exp
0296         >>> mean = 0.0
0297         >>> std = 1.0
0298         >>> expMean = exp(mean + 0.5 * std * std)
0299         >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
0300         >>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
0301         >>> mat = np.matrix(m)
0302         >>> mat.shape
0303         (100, 100)
0304         >>> abs(mat.mean() - expMean) < 0.1
0305         True
0306         >>> abs(mat.std() - expStd) < 0.1
0307         True
0308         """
0309         return callMLlibFunc("logNormalVectorRDD", sc._jsc, float(mean), float(std),
0310                              numRows, numCols, numPartitions, seed)
0311 
0312     @staticmethod
0313     @toArray
0314     @since("1.1.0")
0315     def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
0316         """
0317         Generates an RDD comprised of vectors containing i.i.d. samples drawn
0318         from the Poisson distribution with the input mean.
0319 
0320         :param sc: SparkContext used to create the RDD.
0321         :param mean: Mean, or lambda, for the Poisson distribution.
0322         :param numRows: Number of Vectors in the RDD.
0323         :param numCols: Number of elements in each Vector.
0324         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
0325         :param seed: Random seed (default: a random long integer).
0326         :return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
0327 
0328         >>> import numpy as np
0329         >>> mean = 100.0
0330         >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
0331         >>> mat = np.mat(rdd.collect())
0332         >>> mat.shape
0333         (100, 100)
0334         >>> abs(mat.mean() - mean) < 0.5
0335         True
0336         >>> from math import sqrt
0337         >>> abs(mat.std() - sqrt(mean)) < 0.5
0338         True
0339         """
0340         return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
0341                              numPartitions, seed)
0342 
0343     @staticmethod
0344     @toArray
0345     @since("1.3.0")
0346     def exponentialVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
0347         """
0348         Generates an RDD comprised of vectors containing i.i.d. samples drawn
0349         from the Exponential distribution with the input mean.
0350 
0351         :param sc: SparkContext used to create the RDD.
0352         :param mean: Mean, or 1 / lambda, for the Exponential distribution.
0353         :param numRows: Number of Vectors in the RDD.
0354         :param numCols: Number of elements in each Vector.
0355         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
0356         :param seed: Random seed (default: a random long integer).
0357         :return: RDD of Vector with vectors containing i.i.d. samples ~ Exp(mean).
0358 
0359         >>> import numpy as np
0360         >>> mean = 0.5
0361         >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
0362         >>> mat = np.mat(rdd.collect())
0363         >>> mat.shape
0364         (100, 100)
0365         >>> abs(mat.mean() - mean) < 0.5
0366         True
0367         >>> from math import sqrt
0368         >>> abs(mat.std() - sqrt(mean)) < 0.5
0369         True
0370         """
0371         return callMLlibFunc("exponentialVectorRDD", sc._jsc, float(mean), numRows, numCols,
0372                              numPartitions, seed)
0373 
0374     @staticmethod
0375     @toArray
0376     @since("1.3.0")
0377     def gammaVectorRDD(sc, shape, scale, numRows, numCols, numPartitions=None, seed=None):
0378         """
0379         Generates an RDD comprised of vectors containing i.i.d. samples drawn
0380         from the Gamma distribution.
0381 
0382         :param sc: SparkContext used to create the RDD.
0383         :param shape: Shape (> 0) of the Gamma distribution
0384         :param scale: Scale (> 0) of the Gamma distribution
0385         :param numRows: Number of Vectors in the RDD.
0386         :param numCols: Number of elements in each Vector.
0387         :param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
0388         :param seed: Random seed (default: a random long integer).
0389         :return: RDD of Vector with vectors containing i.i.d. samples ~ Gamma(shape, scale).
0390 
0391         >>> import numpy as np
0392         >>> from math import sqrt
0393         >>> shape = 1.0
0394         >>> scale = 2.0
0395         >>> expMean = shape * scale
0396         >>> expStd = sqrt(shape * scale * scale)
0397         >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
0398         >>> mat.shape
0399         (100, 100)
0400         >>> abs(mat.mean() - expMean) < 0.1
0401         True
0402         >>> abs(mat.std() - expStd) < 0.1
0403         True
0404         """
0405         return callMLlibFunc("gammaVectorRDD", sc._jsc, float(shape), float(scale),
0406                              numRows, numCols, numPartitions, seed)
0407 
0408 
0409 def _test():
0410     import doctest
0411     from pyspark.sql import SparkSession
0412     globs = globals().copy()
0413     # The small batch size here ensures that we see multiple batches,
0414     # even in these small test examples:
0415     spark = SparkSession.builder\
0416         .master("local[2]")\
0417         .appName("mllib.random tests")\
0418         .getOrCreate()
0419     globs['sc'] = spark.sparkContext
0420     (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
0421     spark.stop()
0422     if failure_count:
0423         sys.exit(-1)
0424 
0425 
0426 if __name__ == "__main__":
0427     _test()